You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/24 18:19:54 UTC

[1/6] flink git commit: [FLINK-4917] [streaming api] Deprecate "CheckpointedAsynchronously" interface

Repository: flink
Updated Branches:
  refs/heads/master acfeeaf5e -> 84064c969


[FLINK-4917] [streaming api] Deprecate "CheckpointedAsynchronously" interface

This closes #3087


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d587ff8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d587ff8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d587ff8

Branch: refs/heads/master
Commit: 0d587ff861dbed10dd487886aab2eb83d8c2a443
Parents: e7cda75
Author: mtunique <oa...@gmail.com>
Authored: Tue Jan 10 23:05:44 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100

----------------------------------------------------------------------
 .../api/checkpoint/CheckpointedAsynchronously.java   | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d587ff8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 8ad5ad0..6fcc1d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -36,6 +36,21 @@ import java.io.Serializable;
  * <p>To be able to support asynchronous snapshots, the state returned by the
  * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
  * of the actual state.</p>
+ * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction}.
+ *
+ * The short cut replacement via {@link ListCheckpointed}
+ * <pre>{@code
+ *  public class ExampleOperator implements ListCheckpointed<Integer> {
+ *
+ *		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ *			 return Collections.singletonList(this.value);
+ *		}
+ *
+ *		public void restoreState(List<Integer> state) throws Exception {
+ *			this.value = state.get(0);
+ *		}
+ * }</pre>
  */
+@Deprecated
 @PublicEvolving
 public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}


[5/6] flink git commit: [hotfix] [streaming api] Improve JavaDocs of the user-fcing checkpointing and state interfaces

Posted by se...@apache.org.
[hotfix] [streaming api] Improve JavaDocs of the user-fcing checkpointing and state interfaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c69f366c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c69f366c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c69f366c

Branch: refs/heads/master
Commit: c69f366cea0a537fed4ecfbc7e9f861453ce9f2f
Parents: 0d587ff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 19:07:54 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:12 2017 +0100

----------------------------------------------------------------------
 .../runtime/state/ManagedSnapshotContext.java   |  11 +-
 .../streaming/api/checkpoint/Checkpointed.java  |  35 ++++-
 .../checkpoint/CheckpointedAsynchronously.java  |  48 ++++---
 .../api/checkpoint/CheckpointedFunction.java    | 133 +++++++++++++++++--
 .../api/checkpoint/ListCheckpointed.java        | 125 +++++++++++++++--
 5 files changed, 297 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
index 14156a6..de65c5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
@@ -29,13 +29,18 @@ import org.apache.flink.annotation.PublicEvolving;
 public interface ManagedSnapshotContext {
 
 	/**
-	 * Returns the Id of the checkpoint for which the snapshot is taken.
+	 * Returns the ID of the checkpoint for which the snapshot is taken.
+	 * 
+	 * <p>The checkpoint ID is guaranteed to be strictly monotonously increasing across checkpoints.
+	 * For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > ID_A} means that checkpoint
+	 * <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i>it contains a later state
+	 * than checkpoint <i>A</i>.
 	 */
 	long getCheckpointId();
 
 	/**
-	 * Returns the timestamp of the checkpoint for which the snapshot is taken.
+	 * Returns timestamp (wall clock time) when the master node triggered the checkpoint for which
+	 * the state snapshot is taken.
 	 */
 	long getCheckpointTimestamp();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index fb67ea7..dd93462 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -27,13 +27,38 @@ import java.io.Serializable;
  * checkpointed. The functions get a call whenever a checkpoint should take place
  * and return a snapshot of their state, which will be checkpointed.
  * 
- * <p>This interface marks a function as <i>synchronously</i> checkpointed. While the
- * state is written, the function is not called, so the function needs not return a
- * copy of its state, but may return a reference to its state. Functions that can
- * continue to work and mutate the state, even while the state snapshot is being accessed,
- * can implement the {@link CheckpointedAsynchronously} interface.</p>
+ * <h1>Deprecation and Replacement</h1>
+ *
+ * The short cut replacement for this interface is via {@link ListCheckpointed} and works
+ * as shown in the example below. The {@code ListCheckpointed} interface returns a list of
+ * elements (
+ * 
+ * 
+ *
+ * <pre>{@code
+ * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
+ *
+ *     private int count;
+ *
+ *     public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ *         return Collections.singletonList(this.count);
+ *     }
+ *
+ *     public void restoreState(List<Integer> state) throws Exception {
+ *         this.value = state.count.isEmpty() ? 0 : state.get(0);
+ *     }
+ *
+ *     public T map(T value) {
+ *         count++;
+ *         return value;
+ *     }
+ * }
+ * }</pre>
  * 
  * @param <T> The type of the operator state.
+ * 
+ * @deprecated Please use {@link ListCheckpointed} as illustrated above, or
+ *             {@link CheckpointedFunction} for more control over the checkpointing process.
  */
 @Deprecated
 @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 6fcc1d5..4bafd90 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -18,38 +18,44 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
-
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
 
 /**
- * This interface marks a function/operator as <i>asynchronously checkpointed</i>.
- * Similar to the {@link Checkpointed} interface, the function must produce a
- * snapshot of its state. However, the function must be able to continue working
- * and mutating its state without mutating the returned state snapshot.
+ * This interface marks a function/operator as checkpointed similar to the
+ * {@link Checkpointed} interface, but gives the Flink framework the option to
+ * perform the checkpoint asynchronously. Note that asynchronous checkpointing for 
+ * this interface has not been implemented.
  * 
- * <p>Asynchronous checkpoints are desirable, because they allow the data streams at the
- * point of the checkpointed function/operator to continue running while the checkpoint
- * is in progress.</p>
+ * <h1>Deprecation and Replacement</h1>
+ * 
+ * The shortcut replacement for this interface is via {@link ListCheckpointed} and works
+ * as shown in the example below. Please refer to the JavaDocs of {@link ListCheckpointed} for
+ * a more detailed description of how to use the new interface.
  * 
- * <p>To be able to support asynchronous snapshots, the state returned by the
- * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
- * of the actual state.</p>
- * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction}.
- *
- * The short cut replacement via {@link ListCheckpointed}
  * <pre>{@code
- *  public class ExampleOperator implements ListCheckpointed<Integer> {
+ * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
+ * 
+ *     private int count;
  *
- *		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- *			 return Collections.singletonList(this.value);
- *		}
+ *     public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ *         return Collections.singletonList(this.count);
+ *     }
  *
- *		public void restoreState(List<Integer> state) throws Exception {
- *			this.value = state.get(0);
- *		}
+ *     public void restoreState(List<Integer> state) throws Exception {
+ *         this.value = state.count.isEmpty() ? 0 : state.get(0);
+ *     }
+ * 
+ *     public T map(T value) {
+ *         count++;
+ *         return value;
+ *     }
+ * }
  * }</pre>
+ *
+ * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction} instead,
+ *             as illustrated in the example above.
  */
 @Deprecated
 @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 37d8244..51ac5db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -19,29 +19,135 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 
 /**
+ * This is the core interface for <i>stateful transformation functions</i>, meaning functions
+ * that maintain state across individual stream records.
+ * While more lightweight interfaces exist as shortcuts for various types of state, this interface offer the
+ * greatest flexibility in managing both <i>keyed state</i> and <i>operator state</i>.
+ * 
+ * <p>The section <a href="#shortcuts">Shortcuts</a> illustrates the common lightweight
+ * ways to setup stateful functions typically used instead of the full fledged
+ * abstraction represented by this interface.
+ * 
+ * <h1>Initialization</h1>
+ * 
+ * The {@link CheckpointedFunction#initializeState(FunctionInitializationContext)} is called when
+ * the parallel instance of the transformation function is created during distributed execution.
+ * The method gives access to the {@link FunctionInitializationContext} which in turn gives access
+ * to the to the {@link OperatorStateStore} and {@link KeyedStateStore}.
+ * 
+ * <p>The {@code OperatorStateStore} and {@code KeyedStateStore} give access to the data structures
+ * in which state should be stored for Flink to transparently manage and checkpoint it, such as
+ * {@link org.apache.flink.api.common.state.ValueState} or {@link org.apache.flink.api.common.state.ListState}.
+
+ * <p><i>Note:</i> The {@code KeyedStateStore} can only be used when the transformation supports
+ * <i>keyed state</i>, i.e., when it is applied on a keyed stream (after a {@code keyBy(...)}).
+ * 
+ * <h1>Snapshot</h1>
+ * 
+ * The {@link CheckpointedFunction#snapshotState(FunctionSnapshotContext)} is called whenever a
+ * checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically
+ * make sure that the checkpointed data structures (obtained in the initialization phase) are up
+ * to date for a snapshot to be taken. The given snapshot context gives access to the metadata
+ * of the checkpoint.
+ * 
+ * <p>In addition, functions can use this method as a hook to flush/commit/synchronize with
+ * external systems.
+ *
+ * <h1>Example</h1>
+ * 
+ * The code example below illustrates how to use this interface for a function that keeps counts
+ * of events per key and per parallel partition (parallel instance of the transformation function
+ * during distributed execution).
+ * The example also changes of parallelism, which affect the count-per-parallel-partition by
+ * adding up the counters of partitions that get merged on scale-down. Note that this is a
+ * toy example, but should illustrate the basic skeleton for a stateful function.
+ * 
+ * <pre>{@code
+ * public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
+ *
+ *     private ReducingState<Long> countPerKey;
+ *     private ListState<Long> countPerPartition;
+ *
+ *     private long localCount;
+ *
+ *     public void initializeState(FunctionInitializationContext context) throws Exception {
+ *         // get the state data structure for the per-key state
+ *         countPerKey = context.getKeyedStateStore().getReducingState(
+ *                 new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
+ *
+ *         // get the state data structure for the per-key state
+ *         countPerPartition = context.getOperatorStateStore().getOperatorState(
+ *                 new ListStateDescriptor<>("perPartitionCount", Long.class));
  *
- * Similar to @{@link Checkpointed}, this interface must be implemented by functions that have potentially
- * repartitionable state that needs to be checkpointed. Methods from this interface are called upon checkpointing and
- * initialization of state.
+ *         // initialize the "local count variable" based on the operator state
+ *         for (Long l : countPerPartition.get()) {
+ *             localCount += l;
+ *         }
+ *     }
  *
- * On {@link #initializeState(FunctionInitializationContext)} the implementing class receives a
- * {@link FunctionInitializationContext} which provides access to the {@link OperatorStateStore} (all) and
- * {@link org.apache.flink.api.common.state.KeyedStateStore} (only for keyed operators). Those allow to register
- * managed operator / keyed  user states. Furthermore, the context provides information whether or the operator was
- * restored.
+ *     public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ *         // the keyed state is always up to date anyways
+ *         // just bring the per-partition state in shape
+ *         countPerPartition.clear();
+ *         countPerPartition.add(localCount);
+ *     }
  *
+ *     public T map(T value) throws Exception {
+ *         // update the states
+ *         countPerKey.add(1L);
+ *         localCount++;
  *
- * In {@link #snapshotState(FunctionSnapshotContext)} the implementing class must ensure that all operator / keyed state
- * is passed to user states that have been registered during initialization, so that it is visible to the system
- * backends for checkpointing.
+ *         return value;
+ *     }
+ * }
+ * }</pre>
+ * 
+ * <hr>
+ * 
+ * <h1><a name="shortcuts">Shortcuts</a></h1>
+ * 
+ * There are various ways that transformation functions can use state without implementing the
+ * full-fledged {@code CheckpointedFunction} interface:
+ * 
+ * <h4>Operator State</h4>
  *
+ * Checkpointing some state that is part of the function object itself is possible in a simpler way
+ * by directly implementing the {@link ListCheckpointed} interface. 
+ * That mechanism is similar to the previously used {@link Checkpointed} interface.
+ * 
+ * <h4>Keyed State</h4>
+ * 
+ * Access to keyed state is possible via the {@link RuntimeContext}'s methods:
+ * <pre>{@code
+ * public class CountPerKeyFunction<T> extends RichMapFunction<T, T> {
+ * 
+ *     private ValueState<Long> count;
+ *     
+ *     public void open(Configuration cfg) throws Exception {
+ *         count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class));
+ *     }
+ *     
+ *     public T map(T value) throws Exception {
+ *         Long current = count.get();
+ *         count.update(current == null ? 1L : current + 1);
+ *         
+ *         return value;
+ *     }
+ * }
+ * }</pre>
+ * 
+ * @see ListCheckpointed
+ * @see RuntimeContext
  */
 @PublicEvolving
+@SuppressWarnings("deprecation")
 public interface CheckpointedFunction {
 
 	/**
@@ -55,9 +161,8 @@ public interface CheckpointedFunction {
 	void snapshotState(FunctionSnapshotContext context) throws Exception;
 
 	/**
-	 * This method is called when an operator is initialized, so that the function can set up it's state through
-	 * the provided context. Initialization typically includes registering user states through the state stores
-	 * that the context offers.
+	 * This method is called when the parallel function instance is created during distributed
+	 * execution. Functions typically set up their state storing data structures in this method.
 	 *
 	 * @param context the context for initializing the operator
 	 * @throws Exception

http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 5e85dc1..84a9700 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -19,29 +19,118 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.configuration.Configuration;
 
 import java.io.Serializable;
 import java.util.List;
 
 /**
- * This method must be implemented by functions that have state that needs to be
- * checkpointed. The functions get a call whenever a checkpoint should take place
- * and return a snapshot of their state as a list of redistributable sub-states,
- * which will be checkpointed.
+ * This interface can be implemented by functions that want to store state in checkpoints.
+ * It can be used in a similar way as the deprecated {@link Checkpointed} interface, but
+ * supports <b>list-style state redistribution</b> for cases when the parallelism of the
+ * transformation is changed.
  *
+ * <p>Implementing this interface is a shortcut for obtaining the default {@code ListState}
+ * from the {@link OperatorStateStore}. Using the {@code OperatorStateStore} directly gives
+ * more flexible options to use operator state, for example controlling the serialization
+ * of the state objects, or have multiple named states.
+ * 
+ * <h2>State Redistribution</h2>
+ * 
+ * State redistribution happens when the parallelism of the operator is changed.
+ * State redistribution of <i>operator state<i> (to which category the state handled by this
+ * interface belongs) always goes through a checkpoint, so it appears
+ * to the transformation functions like a failure/recovery combination, where recovery happens
+ * with a different parallelism.
+ * 
+ * <p>Conceptually, the state in the checkpoint is the concatenated list of all lists
+ * returned by the parallel transformation function instances. When restoring from a checkpoint,
+ * the list is divided into sub-lists that are assigned to each parallel function instance. 
+ * 
+ * <p>The following sketch illustrates the state redistribution.The function runs with parallelism
+ * <i>3</i>. The first two parallel instance of the function return lists with two state elements,
+ * the third one a list with one element.  
+ * <pre>
+ *    func_1        func_2     func_3
+ * +----+----+   +----+----+   +----+
+ * | S1 | S2 |   | S3 | S4 |   | S5 |
+ * +----+----+   +----+----+   +----+
+ * </pre>
+ * 
+ * Recovering the checkpoint with <i>parallelism = 5</i> yields the following state assignment:
+ * <pre>
+ * func_1   func_2   func_3   func_4   func_5
+ * +----+   +----+   +----+   +----+   +----+
+ * | S1 |   | S2 |   | S3 |   | S4 |   | S5 |
+ * +----+   +----+   +----+   +----+   +----+
+ * </pre>
+ 
+ * Recovering the checkpoint with <i>parallelism = 5</i> yields the following state assignment:
+ * <pre>
+ *      func_1          func_2
+ * +----+----+----+   +----+----+
+ * | S1 | S2 | S3 |   | S4 | S5 |
+ * +----+----+----+   +----+----+
+ * </pre>
+ * 
+ * <h2>Example</h2>
+ * 
+ * The following example illustrates how to implement a {@code MapFunction} that counts all elements
+ * passing through it, keeping the total count accurate under re-scaling  (changes or parallelism).
+ * 
+ * <pre>{@code
+ * public class CountingFunction<T> implements MapFunction<T, Tuple2<T, Long>>, ListCheckpointed<Long> {
+ * 
+ *     // this count is the number of elements in the parallel subtask 
+ *     private long count;
+ *
+ *     {@literal @}Override
+ *     public List<Long> snapshotState(long checkpointId, long timestamp) {
+ *         // return a single element - our count
+ *         return Collections.singletonList(count);
+ *     }
+ * 
+ *     {@literal @}Override
+ *     public void restoreState(List<Long> state) throws Exception {
+ *         // in case of scale in, this adds up counters from different original subtasks
+ *         // in case of scale out, list this may be empty
+ *         for (Long l : state) {
+ *             count += l;
+ *         }
+ *     }
+ *
+ *     {@literal @}Override
+ *     public Tuple2<T, Long> map(T value) {
+ *         count++;
+ *         return new Tuple2<>(value, count);
+ *     }
+ * }
+ * }</pre>
+ * 
  * @param <T> The type of the operator state.
  */
 @PublicEvolving
 public interface ListCheckpointed<T extends Serializable> {
 
 	/**
-	 * Gets the current state of the function of operator. The state must reflect the result of all
-	 * prior invocations to this function.
+	 * Gets the current state of the function. The state must reflect the result of all prior
+	 * invocations to this function.
+	 * 
+	 * <p>The returned list should contain one entry for redistributable unit of state. See
+	 * the {@link ListCheckpointed class docs} for an illustration how list-style state
+	 * redistribution works.
+	 * 
+	 * <p> As special case, the returned list may be null or empty (if the operator has no state)
+	 * or it may contain a single element (if the operator state is indivisible).
 	 *
-	 * @param checkpointId The ID of the checkpoint.
-	 * @param timestamp Timestamp of the checkpoint.
+	 * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value. 
+	 * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master.
+	 *                     
 	 * @return The operator state in a list of redistributable, atomic sub-states.
 	 *         Should not return null, but empty list instead.
+	 *         
 	 * @throws Exception Thrown if the creation of the state object failed. This causes the
 	 *                   checkpoint to fail. The system may decide to fail the operation (and trigger
 	 *                   recovery), or to discard this checkpoint attempt and to continue running
@@ -51,11 +140,23 @@ public interface ListCheckpointed<T extends Serializable> {
 
 	/**
 	 * Restores the state of the function or operator to that of a previous checkpoint.
-	 * This method is invoked when a function is executed as part of a recovery run.
-	 * <p>
-	 * Note that restoreState() is called before open().
-	 *
+	 * This method is invoked when the function is executed after a failure recovery.
+	 * The state list may be empty if no state is to be recovered by the particular parallel instance
+	 * of the function.
+	 * 
+	 * <p>The given state list will contain all the <i>sub states</i> that this parallel
+	 * instance of the function needs to handle. Refer to the  {@link ListCheckpointed class docs}
+	 * for an illustration how list-style state redistribution works.
+	 * 
+	 * <p><b>Important:</b> When implementing this interface together with {@link RichFunction},
+	 * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}.
+	 * 
 	 * @param state The state to be restored as a list of atomic sub-states.
+	 * 
+	 * @throws Exception Throwing an exception in this method causes the recovery to fail.
+	 *                   The exact consequence depends on the configured failure handling strategy,
+	 *                   but typically the system will re-attempt the recovery, or try recovering
+	 *                   from a different checkpoint.
 	 */
 	void restoreState(List<T> state) throws Exception;
 }


[2/6] flink git commit: [FLINK-5247] [streaming api] Fix checks for allowed lateness in windowed streams

Posted by se...@apache.org.
[FLINK-5247] [streaming api] Fix checks for allowed lateness in windowed streams

Also, fix outdated documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87af8419
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87af8419
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87af8419

Branch: refs/heads/master
Commit: 87af84194911eb1e0c3b3a894bb3f04b628fbf11
Parents: acfeeaf
Author: Rohit Agarwal <mi...@gmail.com>
Authored: Sat Dec 3 12:15:45 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/AllWindowedStream.java      | 6 ++----
 .../apache/flink/streaming/api/datastream/WindowedStream.java  | 6 ++----
 .../apache/flink/streaming/api/scala/AllWindowedStream.scala   | 2 +-
 .../org/apache/flink/streaming/api/scala/WindowedStream.scala  | 2 +-
 4 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 31ea001..bd11de3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -123,11 +123,9 @@ public class AllWindowedStream<T, W extends Window> {
 	@PublicEvolving
 	public AllWindowedStream<T, W> allowedLateness(Time lateness) {
 		long millis = lateness.toMilliseconds();
-		if (allowedLateness < 0) {
+		if (millis < 0) {
 			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
-			throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
-		} else {
+		} else if (windowAssigner.isEventTime()) {
 			this.allowedLateness = millis;
 		}
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0a02885..c360ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -146,11 +146,9 @@ public class WindowedStream<T, K, W extends Window> {
 	@PublicEvolving
 	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
 		long millis = lateness.toMilliseconds();
-		if (allowedLateness < 0) {
+		if (millis < 0) {
 			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
-			throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
-		} else {
+		} else if (windowAssigner.isEventTime()) {
 			this.allowedLateness = millis;
 		}
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 83104e8..324689a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -58,7 +58,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
 
   /**
     * Sets the allowed lateness to a user-specified value.
-    * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+    * If not explicitly set, the allowed lateness is [[0L]].
     * Setting the allowed lateness is only valid for event-time windows.
     * If a value different than 0 is provided with a processing-time
     * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],

http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 76d9cda..db187ea 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -61,7 +61,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
 
   /**
     * Sets the allowed lateness to a user-specified value.
-    * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+    * If not explicitly set, the allowed lateness is [[0L]].
     * Setting the allowed lateness is only valid for event-time windows.
     * If a value different than 0 is provided with a processing-time
     * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],


[6/6] flink git commit: [FLINK-5577] [yarn] Fix application id growth by reusing Yarn Client

Posted by se...@apache.org.
[FLINK-5577] [yarn] Fix application id growth by reusing Yarn Client

This closes #3173


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84064c96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84064c96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84064c96

Branch: refs/heads/master
Commit: 84064c969a31fe44579b4b1da588d5b117eb4c7d
Parents: c69f366
Author: fengyelei <fe...@huawei.com>
Authored: Thu Jan 19 23:27:11 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:12 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java   | 6 ++----
 .../main/java/org/apache/flink/yarn/YarnClusterClientV2.java   | 4 +++-
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84064c96/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 6d54c5e..b537e09 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -499,7 +499,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// Create application via yarnClient
 		final YarnClientApplication yarnApplication = yarnClient.createApplication();
 		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
 
 		Resource maxRes = appResponse.getMaximumResourceCapability();
 		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
@@ -555,7 +554,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		ApplicationReport report = startAppMaster(null, yarnClient);
+		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
 
 		String host = report.getHost();
 		int port = report.getRpcPort();
@@ -568,7 +567,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
 	}
 
-	public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient) throws Exception {
+	public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {
 
 		// ------------------ Set default file system scheme -------------------------
 
@@ -592,7 +591,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					+ "The Flink YARN client needs to store its files in a distributed file system");
 		}
 
-		final YarnClientApplication yarnApplication = yarnClient.createApplication();
 		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
 		Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
 		for (File file : shipFiles) {

http://git-wip-us.apache.org/repos/asf/flink/blob/84064c96/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
index e9c6636..5a3a3c0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +91,8 @@ public class YarnClusterClientV2 extends ClusterClient {
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
 		try {
 			// Create application via yarnClient
-			ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient);
+			final YarnClientApplication yarnApplication = yarnClient.createApplication();
+			ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient, yarnApplication);
 			if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
 				appId = report.getApplicationId();
 				trackingURL = report.getTrackingUrl();


[4/6] flink git commit: [hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream

Posted by se...@apache.org.
[hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6342d6db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6342d6db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6342d6db

Branch: refs/heads/master
Commit: 6342d6db1de5f38a921732e35abd83e6c5b9305a
Parents: 87af841
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 14:55:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/AllWindowedStream.java     | 12 ++++++------
 .../flink/streaming/api/datastream/WindowedStream.java  | 11 +++++------
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6342d6db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index bd11de3..5de1774 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -53,6 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
  * elements is split into windows based on a
@@ -122,12 +124,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> allowedLateness(Time lateness) {
-		long millis = lateness.toMilliseconds();
-		if (millis < 0) {
-			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (windowAssigner.isEventTime()) {
-			this.allowedLateness = millis;
-		}
+		final long millis = lateness.toMilliseconds();
+		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+		this.allowedLateness = millis;
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6342d6db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index c360ea1..c74bad7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -145,12 +146,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
-		long millis = lateness.toMilliseconds();
-		if (millis < 0) {
-			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (windowAssigner.isEventTime()) {
-			this.allowedLateness = millis;
-		}
+		final long millis = lateness.toMilliseconds();
+		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+		this.allowedLateness = millis;
 		return this;
 	}
 


[3/6] flink git commit: [FLINK-4905] [kafka 08 consumer] Suppress offset committing failures when fetcher is shutting down

Posted by se...@apache.org.
[FLINK-4905] [kafka 08 consumer] Suppress offset committing failures when fetcher is shutting down

This closes #3035


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7cda75b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7cda75b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7cda75b

Branch: refs/heads/master
Commit: e7cda75b8594417559d6aac6229b5893f5459f0f
Parents: 6342d6d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 16:28:54 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java         |  7 ++++---
 .../connectors/kafka/internals/Kafka08Fetcher.java     | 13 +++++++++++--
 .../streaming/connectors/kafka/Kafka08ITCase.java      |  7 -------
 3 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0aacccd..0f11c72 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -292,11 +292,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 						}
 					}
 					break retryLoop; // leave the loop through the brokers
-				} catch (Exception e) {
+				}
+				catch (Exception e) {
 					//validates seed brokers in case of a ClosedChannelException
 					validateSeedBrokers(seedBrokers, e);
-					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
-							"" + e.getClass() + ". Message: " + e.getMessage());
+					LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+							seedBroker, topics, e.getClass().getName(), e.getMessage());
 					LOG.debug("Detailed trace", e);
 					// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
 					try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index d015157..5a0aed3 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -347,8 +347,17 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
-			// the ZK handler takes care of incrementing the offsets by 1 before committing
-			zkHandler.prepareAndCommitOffsets(offsets);
+			try {
+				// the ZK handler takes care of incrementing the offsets by 1 before committing
+				zkHandler.prepareAndCommitOffsets(offsets);
+			}
+			catch (Exception e) {
+				if (running) {
+					throw e;
+				} else {
+					return;
+				}
+			}
 		}
 
 		// Set committed offsets in topic partition state

http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index fabb0fe..0cdf465 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -19,19 +19,12 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;