You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/24 18:19:54 UTC
[1/6] flink git commit: [FLINK-4917] [streaming api] Deprecate
"CheckpointedAsynchronously" interface
Repository: flink
Updated Branches:
refs/heads/master acfeeaf5e -> 84064c969
[FLINK-4917] [streaming api] Deprecate "CheckpointedAsynchronously" interface
This closes #3087
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d587ff8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d587ff8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d587ff8
Branch: refs/heads/master
Commit: 0d587ff861dbed10dd487886aab2eb83d8c2a443
Parents: e7cda75
Author: mtunique <oa...@gmail.com>
Authored: Tue Jan 10 23:05:44 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100
----------------------------------------------------------------------
.../api/checkpoint/CheckpointedAsynchronously.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d587ff8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 8ad5ad0..6fcc1d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -36,6 +36,21 @@ import java.io.Serializable;
* <p>To be able to support asynchronous snapshots, the state returned by the
* {@link #snapshotState(long, long)} method is typically a copy or shadow copy
* of the actual state.</p>
+ * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction}.
+ *
+ * The short cut replacement via {@link ListCheckpointed}
+ * <pre>{@code
+ * public class ExampleOperator implements ListCheckpointed<Integer> {
+ *
+ * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ * return Collections.singletonList(this.value);
+ * }
+ *
+ * public void restoreState(List<Integer> state) throws Exception {
+ * this.value = state.get(0);
+ * }
+ * }</pre>
*/
+@Deprecated
@PublicEvolving
public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}
[5/6] flink git commit: [hotfix] [streaming api] Improve JavaDocs of
the user-fcing checkpointing and state interfaces
Posted by se...@apache.org.
[hotfix] [streaming api] Improve JavaDocs of the user-fcing checkpointing and state interfaces
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c69f366c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c69f366c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c69f366c
Branch: refs/heads/master
Commit: c69f366cea0a537fed4ecfbc7e9f861453ce9f2f
Parents: 0d587ff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 19:07:54 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:12 2017 +0100
----------------------------------------------------------------------
.../runtime/state/ManagedSnapshotContext.java | 11 +-
.../streaming/api/checkpoint/Checkpointed.java | 35 ++++-
.../checkpoint/CheckpointedAsynchronously.java | 48 ++++---
.../api/checkpoint/CheckpointedFunction.java | 133 +++++++++++++++++--
.../api/checkpoint/ListCheckpointed.java | 125 +++++++++++++++--
5 files changed, 297 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
index 14156a6..de65c5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
@@ -29,13 +29,18 @@ import org.apache.flink.annotation.PublicEvolving;
public interface ManagedSnapshotContext {
/**
- * Returns the Id of the checkpoint for which the snapshot is taken.
+ * Returns the ID of the checkpoint for which the snapshot is taken.
+ *
+ * <p>The checkpoint ID is guaranteed to be strictly monotonously increasing across checkpoints.
+ * For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > ID_A} means that checkpoint
+ * <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i>it contains a later state
+ * than checkpoint <i>A</i>.
*/
long getCheckpointId();
/**
- * Returns the timestamp of the checkpoint for which the snapshot is taken.
+ * Returns timestamp (wall clock time) when the master node triggered the checkpoint for which
+ * the state snapshot is taken.
*/
long getCheckpointTimestamp();
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index fb67ea7..dd93462 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -27,13 +27,38 @@ import java.io.Serializable;
* checkpointed. The functions get a call whenever a checkpoint should take place
* and return a snapshot of their state, which will be checkpointed.
*
- * <p>This interface marks a function as <i>synchronously</i> checkpointed. While the
- * state is written, the function is not called, so the function needs not return a
- * copy of its state, but may return a reference to its state. Functions that can
- * continue to work and mutate the state, even while the state snapshot is being accessed,
- * can implement the {@link CheckpointedAsynchronously} interface.</p>
+ * <h1>Deprecation and Replacement</h1>
+ *
+ * The short cut replacement for this interface is via {@link ListCheckpointed} and works
+ * as shown in the example below. The {@code ListCheckpointed} interface returns a list of
+ * elements (
+ *
+ *
+ *
+ * <pre>{@code
+ * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
+ *
+ * private int count;
+ *
+ * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ * return Collections.singletonList(this.count);
+ * }
+ *
+ * public void restoreState(List<Integer> state) throws Exception {
+ * this.value = state.count.isEmpty() ? 0 : state.get(0);
+ * }
+ *
+ * public T map(T value) {
+ * count++;
+ * return value;
+ * }
+ * }
+ * }</pre>
*
* @param <T> The type of the operator state.
+ *
+ * @deprecated Please use {@link ListCheckpointed} as illustrated above, or
+ * {@link CheckpointedFunction} for more control over the checkpointing process.
*/
@Deprecated
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 6fcc1d5..4bafd90 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -18,38 +18,44 @@
package org.apache.flink.streaming.api.checkpoint;
-
import org.apache.flink.annotation.PublicEvolving;
import java.io.Serializable;
/**
- * This interface marks a function/operator as <i>asynchronously checkpointed</i>.
- * Similar to the {@link Checkpointed} interface, the function must produce a
- * snapshot of its state. However, the function must be able to continue working
- * and mutating its state without mutating the returned state snapshot.
+ * This interface marks a function/operator as checkpointed similar to the
+ * {@link Checkpointed} interface, but gives the Flink framework the option to
+ * perform the checkpoint asynchronously. Note that asynchronous checkpointing for
+ * this interface has not been implemented.
*
- * <p>Asynchronous checkpoints are desirable, because they allow the data streams at the
- * point of the checkpointed function/operator to continue running while the checkpoint
- * is in progress.</p>
+ * <h1>Deprecation and Replacement</h1>
+ *
+ * The shortcut replacement for this interface is via {@link ListCheckpointed} and works
+ * as shown in the example below. Please refer to the JavaDocs of {@link ListCheckpointed} for
+ * a more detailed description of how to use the new interface.
*
- * <p>To be able to support asynchronous snapshots, the state returned by the
- * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
- * of the actual state.</p>
- * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction}.
- *
- * The short cut replacement via {@link ListCheckpointed}
* <pre>{@code
- * public class ExampleOperator implements ListCheckpointed<Integer> {
+ * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
+ *
+ * private int count;
*
- * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- * return Collections.singletonList(this.value);
- * }
+ * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ * return Collections.singletonList(this.count);
+ * }
*
- * public void restoreState(List<Integer> state) throws Exception {
- * this.value = state.get(0);
- * }
+ * public void restoreState(List<Integer> state) throws Exception {
+ * this.value = state.count.isEmpty() ? 0 : state.get(0);
+ * }
+ *
+ * public T map(T value) {
+ * count++;
+ * return value;
+ * }
+ * }
* }</pre>
+ *
+ * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction} instead,
+ * as illustrated in the example above.
*/
@Deprecated
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 37d8244..51ac5db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -19,29 +19,135 @@
package org.apache.flink.streaming.api.checkpoint;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
/**
+ * This is the core interface for <i>stateful transformation functions</i>, meaning functions
+ * that maintain state across individual stream records.
+ * While more lightweight interfaces exist as shortcuts for various types of state, this interface offer the
+ * greatest flexibility in managing both <i>keyed state</i> and <i>operator state</i>.
+ *
+ * <p>The section <a href="#shortcuts">Shortcuts</a> illustrates the common lightweight
+ * ways to setup stateful functions typically used instead of the full fledged
+ * abstraction represented by this interface.
+ *
+ * <h1>Initialization</h1>
+ *
+ * The {@link CheckpointedFunction#initializeState(FunctionInitializationContext)} is called when
+ * the parallel instance of the transformation function is created during distributed execution.
+ * The method gives access to the {@link FunctionInitializationContext} which in turn gives access
+ * to the to the {@link OperatorStateStore} and {@link KeyedStateStore}.
+ *
+ * <p>The {@code OperatorStateStore} and {@code KeyedStateStore} give access to the data structures
+ * in which state should be stored for Flink to transparently manage and checkpoint it, such as
+ * {@link org.apache.flink.api.common.state.ValueState} or {@link org.apache.flink.api.common.state.ListState}.
+
+ * <p><i>Note:</i> The {@code KeyedStateStore} can only be used when the transformation supports
+ * <i>keyed state</i>, i.e., when it is applied on a keyed stream (after a {@code keyBy(...)}).
+ *
+ * <h1>Snapshot</h1>
+ *
+ * The {@link CheckpointedFunction#snapshotState(FunctionSnapshotContext)} is called whenever a
+ * checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically
+ * make sure that the checkpointed data structures (obtained in the initialization phase) are up
+ * to date for a snapshot to be taken. The given snapshot context gives access to the metadata
+ * of the checkpoint.
+ *
+ * <p>In addition, functions can use this method as a hook to flush/commit/synchronize with
+ * external systems.
+ *
+ * <h1>Example</h1>
+ *
+ * The code example below illustrates how to use this interface for a function that keeps counts
+ * of events per key and per parallel partition (parallel instance of the transformation function
+ * during distributed execution).
+ * The example also changes of parallelism, which affect the count-per-parallel-partition by
+ * adding up the counters of partitions that get merged on scale-down. Note that this is a
+ * toy example, but should illustrate the basic skeleton for a stateful function.
+ *
+ * <pre>{@code
+ * public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
+ *
+ * private ReducingState<Long> countPerKey;
+ * private ListState<Long> countPerPartition;
+ *
+ * private long localCount;
+ *
+ * public void initializeState(FunctionInitializationContext context) throws Exception {
+ * // get the state data structure for the per-key state
+ * countPerKey = context.getKeyedStateStore().getReducingState(
+ * new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
+ *
+ * // get the state data structure for the per-key state
+ * countPerPartition = context.getOperatorStateStore().getOperatorState(
+ * new ListStateDescriptor<>("perPartitionCount", Long.class));
*
- * Similar to @{@link Checkpointed}, this interface must be implemented by functions that have potentially
- * repartitionable state that needs to be checkpointed. Methods from this interface are called upon checkpointing and
- * initialization of state.
+ * // initialize the "local count variable" based on the operator state
+ * for (Long l : countPerPartition.get()) {
+ * localCount += l;
+ * }
+ * }
*
- * On {@link #initializeState(FunctionInitializationContext)} the implementing class receives a
- * {@link FunctionInitializationContext} which provides access to the {@link OperatorStateStore} (all) and
- * {@link org.apache.flink.api.common.state.KeyedStateStore} (only for keyed operators). Those allow to register
- * managed operator / keyed user states. Furthermore, the context provides information whether or the operator was
- * restored.
+ * public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ * // the keyed state is always up to date anyways
+ * // just bring the per-partition state in shape
+ * countPerPartition.clear();
+ * countPerPartition.add(localCount);
+ * }
*
+ * public T map(T value) throws Exception {
+ * // update the states
+ * countPerKey.add(1L);
+ * localCount++;
*
- * In {@link #snapshotState(FunctionSnapshotContext)} the implementing class must ensure that all operator / keyed state
- * is passed to user states that have been registered during initialization, so that it is visible to the system
- * backends for checkpointing.
+ * return value;
+ * }
+ * }
+ * }</pre>
+ *
+ * <hr>
+ *
+ * <h1><a name="shortcuts">Shortcuts</a></h1>
+ *
+ * There are various ways that transformation functions can use state without implementing the
+ * full-fledged {@code CheckpointedFunction} interface:
+ *
+ * <h4>Operator State</h4>
*
+ * Checkpointing some state that is part of the function object itself is possible in a simpler way
+ * by directly implementing the {@link ListCheckpointed} interface.
+ * That mechanism is similar to the previously used {@link Checkpointed} interface.
+ *
+ * <h4>Keyed State</h4>
+ *
+ * Access to keyed state is possible via the {@link RuntimeContext}'s methods:
+ * <pre>{@code
+ * public class CountPerKeyFunction<T> extends RichMapFunction<T, T> {
+ *
+ * private ValueState<Long> count;
+ *
+ * public void open(Configuration cfg) throws Exception {
+ * count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class));
+ * }
+ *
+ * public T map(T value) throws Exception {
+ * Long current = count.get();
+ * count.update(current == null ? 1L : current + 1);
+ *
+ * return value;
+ * }
+ * }
+ * }</pre>
+ *
+ * @see ListCheckpointed
+ * @see RuntimeContext
*/
@PublicEvolving
+@SuppressWarnings("deprecation")
public interface CheckpointedFunction {
/**
@@ -55,9 +161,8 @@ public interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
- * This method is called when an operator is initialized, so that the function can set up it's state through
- * the provided context. Initialization typically includes registering user states through the state stores
- * that the context offers.
+ * This method is called when the parallel function instance is created during distributed
+ * execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
http://git-wip-us.apache.org/repos/asf/flink/blob/c69f366c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 5e85dc1..84a9700 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -19,29 +19,118 @@
package org.apache.flink.streaming.api.checkpoint;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.configuration.Configuration;
import java.io.Serializable;
import java.util.List;
/**
- * This method must be implemented by functions that have state that needs to be
- * checkpointed. The functions get a call whenever a checkpoint should take place
- * and return a snapshot of their state as a list of redistributable sub-states,
- * which will be checkpointed.
+ * This interface can be implemented by functions that want to store state in checkpoints.
+ * It can be used in a similar way as the deprecated {@link Checkpointed} interface, but
+ * supports <b>list-style state redistribution</b> for cases when the parallelism of the
+ * transformation is changed.
*
+ * <p>Implementing this interface is a shortcut for obtaining the default {@code ListState}
+ * from the {@link OperatorStateStore}. Using the {@code OperatorStateStore} directly gives
+ * more flexible options to use operator state, for example controlling the serialization
+ * of the state objects, or have multiple named states.
+ *
+ * <h2>State Redistribution</h2>
+ *
+ * State redistribution happens when the parallelism of the operator is changed.
+ * State redistribution of <i>operator state<i> (to which category the state handled by this
+ * interface belongs) always goes through a checkpoint, so it appears
+ * to the transformation functions like a failure/recovery combination, where recovery happens
+ * with a different parallelism.
+ *
+ * <p>Conceptually, the state in the checkpoint is the concatenated list of all lists
+ * returned by the parallel transformation function instances. When restoring from a checkpoint,
+ * the list is divided into sub-lists that are assigned to each parallel function instance.
+ *
+ * <p>The following sketch illustrates the state redistribution.The function runs with parallelism
+ * <i>3</i>. The first two parallel instance of the function return lists with two state elements,
+ * the third one a list with one element.
+ * <pre>
+ * func_1 func_2 func_3
+ * +----+----+ +----+----+ +----+
+ * | S1 | S2 | | S3 | S4 | | S5 |
+ * +----+----+ +----+----+ +----+
+ * </pre>
+ *
+ * Recovering the checkpoint with <i>parallelism = 5</i> yields the following state assignment:
+ * <pre>
+ * func_1 func_2 func_3 func_4 func_5
+ * +----+ +----+ +----+ +----+ +----+
+ * | S1 | | S2 | | S3 | | S4 | | S5 |
+ * +----+ +----+ +----+ +----+ +----+
+ * </pre>
+
+ * Recovering the checkpoint with <i>parallelism = 5</i> yields the following state assignment:
+ * <pre>
+ * func_1 func_2
+ * +----+----+----+ +----+----+
+ * | S1 | S2 | S3 | | S4 | S5 |
+ * +----+----+----+ +----+----+
+ * </pre>
+ *
+ * <h2>Example</h2>
+ *
+ * The following example illustrates how to implement a {@code MapFunction} that counts all elements
+ * passing through it, keeping the total count accurate under re-scaling (changes or parallelism).
+ *
+ * <pre>{@code
+ * public class CountingFunction<T> implements MapFunction<T, Tuple2<T, Long>>, ListCheckpointed<Long> {
+ *
+ * // this count is the number of elements in the parallel subtask
+ * private long count;
+ *
+ * {@literal @}Override
+ * public List<Long> snapshotState(long checkpointId, long timestamp) {
+ * // return a single element - our count
+ * return Collections.singletonList(count);
+ * }
+ *
+ * {@literal @}Override
+ * public void restoreState(List<Long> state) throws Exception {
+ * // in case of scale in, this adds up counters from different original subtasks
+ * // in case of scale out, list this may be empty
+ * for (Long l : state) {
+ * count += l;
+ * }
+ * }
+ *
+ * {@literal @}Override
+ * public Tuple2<T, Long> map(T value) {
+ * count++;
+ * return new Tuple2<>(value, count);
+ * }
+ * }
+ * }</pre>
+ *
* @param <T> The type of the operator state.
*/
@PublicEvolving
public interface ListCheckpointed<T extends Serializable> {
/**
- * Gets the current state of the function of operator. The state must reflect the result of all
- * prior invocations to this function.
+ * Gets the current state of the function. The state must reflect the result of all prior
+ * invocations to this function.
+ *
+ * <p>The returned list should contain one entry for redistributable unit of state. See
+ * the {@link ListCheckpointed class docs} for an illustration how list-style state
+ * redistribution works.
+ *
+ * <p> As special case, the returned list may be null or empty (if the operator has no state)
+ * or it may contain a single element (if the operator state is indivisible).
*
- * @param checkpointId The ID of the checkpoint.
- * @param timestamp Timestamp of the checkpoint.
+ * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value.
+ * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master.
+ *
* @return The operator state in a list of redistributable, atomic sub-states.
* Should not return null, but empty list instead.
+ *
* @throws Exception Thrown if the creation of the state object failed. This causes the
* checkpoint to fail. The system may decide to fail the operation (and trigger
* recovery), or to discard this checkpoint attempt and to continue running
@@ -51,11 +140,23 @@ public interface ListCheckpointed<T extends Serializable> {
/**
* Restores the state of the function or operator to that of a previous checkpoint.
- * This method is invoked when a function is executed as part of a recovery run.
- * <p>
- * Note that restoreState() is called before open().
- *
+ * This method is invoked when the function is executed after a failure recovery.
+ * The state list may be empty if no state is to be recovered by the particular parallel instance
+ * of the function.
+ *
+ * <p>The given state list will contain all the <i>sub states</i> that this parallel
+ * instance of the function needs to handle. Refer to the {@link ListCheckpointed class docs}
+ * for an illustration how list-style state redistribution works.
+ *
+ * <p><b>Important:</b> When implementing this interface together with {@link RichFunction},
+ * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}.
+ *
* @param state The state to be restored as a list of atomic sub-states.
+ *
+ * @throws Exception Throwing an exception in this method causes the recovery to fail.
+ * The exact consequence depends on the configured failure handling strategy,
+ * but typically the system will re-attempt the recovery, or try recovering
+ * from a different checkpoint.
*/
void restoreState(List<T> state) throws Exception;
}
[2/6] flink git commit: [FLINK-5247] [streaming api] Fix checks for
allowed lateness in windowed streams
Posted by se...@apache.org.
[FLINK-5247] [streaming api] Fix checks for allowed lateness in windowed streams
Also, fix outdated documentation.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87af8419
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87af8419
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87af8419
Branch: refs/heads/master
Commit: 87af84194911eb1e0c3b3a894bb3f04b628fbf11
Parents: acfeeaf
Author: Rohit Agarwal <mi...@gmail.com>
Authored: Sat Dec 3 12:15:45 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100
----------------------------------------------------------------------
.../flink/streaming/api/datastream/AllWindowedStream.java | 6 ++----
.../apache/flink/streaming/api/datastream/WindowedStream.java | 6 ++----
.../apache/flink/streaming/api/scala/AllWindowedStream.scala | 2 +-
.../org/apache/flink/streaming/api/scala/WindowedStream.scala | 2 +-
4 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 31ea001..bd11de3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -123,11 +123,9 @@ public class AllWindowedStream<T, W extends Window> {
@PublicEvolving
public AllWindowedStream<T, W> allowedLateness(Time lateness) {
long millis = lateness.toMilliseconds();
- if (allowedLateness < 0) {
+ if (millis < 0) {
throw new IllegalArgumentException("The allowed lateness cannot be negative.");
- } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
- throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
- } else {
+ } else if (windowAssigner.isEventTime()) {
this.allowedLateness = millis;
}
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0a02885..c360ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -146,11 +146,9 @@ public class WindowedStream<T, K, W extends Window> {
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
long millis = lateness.toMilliseconds();
- if (allowedLateness < 0) {
+ if (millis < 0) {
throw new IllegalArgumentException("The allowed lateness cannot be negative.");
- } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
- throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
- } else {
+ } else if (windowAssigner.isEventTime()) {
this.allowedLateness = millis;
}
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 83104e8..324689a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -58,7 +58,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
/**
* Sets the allowed lateness to a user-specified value.
- * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+ * If not explicitly set, the allowed lateness is [[0L]].
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 76d9cda..db187ea 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -61,7 +61,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
/**
* Sets the allowed lateness to a user-specified value.
- * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+ * If not explicitly set, the allowed lateness is [[0L]].
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
[6/6] flink git commit: [FLINK-5577] [yarn] Fix application id growth
by reusing Yarn Client
Posted by se...@apache.org.
[FLINK-5577] [yarn] Fix application id growth by reusing Yarn Client
This closes #3173
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84064c96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84064c96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84064c96
Branch: refs/heads/master
Commit: 84064c969a31fe44579b4b1da588d5b117eb4c7d
Parents: c69f366
Author: fengyelei <fe...@huawei.com>
Authored: Thu Jan 19 23:27:11 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:12 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 6 ++----
.../main/java/org/apache/flink/yarn/YarnClusterClientV2.java | 4 +++-
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84064c96/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 6d54c5e..b537e09 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -499,7 +499,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
- ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Resource maxRes = appResponse.getMaximumResourceCapability();
final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
@@ -555,7 +554,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- ApplicationReport report = startAppMaster(null, yarnClient);
+ ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
String host = report.getHost();
int port = report.getRpcPort();
@@ -568,7 +567,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
}
- public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient) throws Exception {
+ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {
// ------------------ Set default file system scheme -------------------------
@@ -592,7 +591,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
+ "The Flink YARN client needs to store its files in a distributed file system");
}
- final YarnClientApplication yarnApplication = yarnClient.createApplication();
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
http://git-wip-us.apache.org/repos/asf/flink/blob/84064c96/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
index e9c6636..5a3a3c0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,8 @@ public class YarnClusterClientV2 extends ClusterClient {
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
try {
// Create application via yarnClient
- ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient);
+ final YarnClientApplication yarnApplication = yarnClient.createApplication();
+ ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient, yarnApplication);
if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
appId = report.getApplicationId();
trackingURL = report.getTrackingUrl();
[4/6] flink git commit: [hotfix] [streaming api] Minor cleanup in
WindowedStream and AllWindowedStream
Posted by se...@apache.org.
[hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6342d6db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6342d6db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6342d6db
Branch: refs/heads/master
Commit: 6342d6db1de5f38a921732e35abd83e6c5b9305a
Parents: 87af841
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 14:55:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/AllWindowedStream.java | 12 ++++++------
.../flink/streaming/api/datastream/WindowedStream.java | 11 +++++------
2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6342d6db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index bd11de3..5de1774 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -53,6 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* A {@code AllWindowedStream} represents a data stream where the stream of
* elements is split into windows based on a
@@ -122,12 +124,10 @@ public class AllWindowedStream<T, W extends Window> {
*/
@PublicEvolving
public AllWindowedStream<T, W> allowedLateness(Time lateness) {
- long millis = lateness.toMilliseconds();
- if (millis < 0) {
- throw new IllegalArgumentException("The allowed lateness cannot be negative.");
- } else if (windowAssigner.isEventTime()) {
- this.allowedLateness = millis;
- }
+ final long millis = lateness.toMilliseconds();
+ checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+ this.allowedLateness = millis;
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6342d6db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index c360ea1..c74bad7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -145,12 +146,10 @@ public class WindowedStream<T, K, W extends Window> {
*/
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
- long millis = lateness.toMilliseconds();
- if (millis < 0) {
- throw new IllegalArgumentException("The allowed lateness cannot be negative.");
- } else if (windowAssigner.isEventTime()) {
- this.allowedLateness = millis;
- }
+ final long millis = lateness.toMilliseconds();
+ checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+ this.allowedLateness = millis;
return this;
}
[3/6] flink git commit: [FLINK-4905] [kafka 08 consumer] Suppress
offset committing failures when fetcher is shutting down
Posted by se...@apache.org.
[FLINK-4905] [kafka 08 consumer] Suppress offset committing failures when fetcher is shutting down
This closes #3035
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7cda75b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7cda75b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7cda75b
Branch: refs/heads/master
Commit: e7cda75b8594417559d6aac6229b5893f5459f0f
Parents: 6342d6d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 16:28:54 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumer08.java | 7 ++++---
.../connectors/kafka/internals/Kafka08Fetcher.java | 13 +++++++++++--
.../streaming/connectors/kafka/Kafka08ITCase.java | 7 -------
3 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0aacccd..0f11c72 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -292,11 +292,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
}
break retryLoop; // leave the loop through the brokers
- } catch (Exception e) {
+ }
+ catch (Exception e) {
//validates seed brokers in case of a ClosedChannelException
validateSeedBrokers(seedBrokers, e);
- LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
- "" + e.getClass() + ". Message: " + e.getMessage());
+ LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+ seedBroker, topics, e.getClass().getName(), e.getMessage());
LOG.debug("Detailed trace", e);
// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index d015157..5a0aed3 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -347,8 +347,17 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
- // the ZK handler takes care of incrementing the offsets by 1 before committing
- zkHandler.prepareAndCommitOffsets(offsets);
+ try {
+ // the ZK handler takes care of incrementing the offsets by 1 before committing
+ zkHandler.prepareAndCommitOffsets(offsets);
+ }
+ catch (Exception e) {
+ if (running) {
+ throw e;
+ } else {
+ return;
+ }
+ }
}
// Set committed offsets in topic partition state
http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index fabb0fe..0cdf465 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -19,19 +19,12 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;