You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/13 12:44:19 UTC
[2/4] flink git commit: [FLINK-5163] Port the FromElementsFunction to
the new state abstractions.
[FLINK-5163] Port the FromElementsFunction to the new state abstractions.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d24833db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d24833db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d24833db
Branch: refs/heads/master
Commit: d24833dbb641979008ba18be8b2ddb694a5d43e1
Parents: 685c4f8
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 17 16:52:50 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 13 13:38:18 2016 +0100
----------------------------------------------------------------------
.../functions/source/FromElementsFunction.java | 61 ++++++++++++++++----
.../api/functions/FromElementsFunctionTest.java | 35 +++++++----
2 files changed, 72 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d24833db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 98bc10e..e3a9d54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -18,17 +18,25 @@
package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
/**
* A stream source function that returns a sequence of elements.
@@ -36,11 +44,14 @@ import java.util.Collection;
* <p>Upon construction, this source function serializes the elements using Flink's type information.
* That way, any object transport using Java serialization will not be affected by the serializability
* of the elements.</p>
- *
+ *
+ * <p>
+ * <b>NOTE:</b> This source has a parallelism of 1.
+ *
* @param <T> The type of elements returned by this function.
*/
@PublicEvolving
-public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedAsynchronously<Integer> {
+public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedFunction {
private static final long serialVersionUID = 1L;
@@ -62,7 +73,8 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedA
/** Flag to make the source cancelable */
private volatile boolean isRunning = true;
-
+ private transient ListState<Integer> checkpointedState;
+
public FromElementsFunction(TypeSerializer<T> serializer, T... elements) throws IOException {
this(serializer, Arrays.asList(elements));
}
@@ -88,6 +100,32 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedA
}
@Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ Preconditions.checkState(this.checkpointedState == null,
+ "The " + getClass().getSimpleName() + " has already been initialized.");
+
+ this.checkpointedState = context.getOperatorStateStore().getOperatorState(
+ new ListStateDescriptor<>(
+ "from-elements-state",
+ IntSerializer.INSTANCE
+ )
+ );
+
+ if (context.isRestored()) {
+ List<Integer> retrievedStates = new ArrayList<>();
+ for (Integer entry : this.checkpointedState.get()) {
+ retrievedStates.add(entry);
+ }
+
+ // given that the parallelism of the function is 1, we can only have 1 state
+ Preconditions.checkArgument(retrievedStates.size() == 1,
+ getClass().getSimpleName() + " retrieved invalid state.");
+
+ this.numElementsToSkip = retrievedStates.get(0);
+ }
+ }
+
+ @Override
public void run(SourceContext<T> ctx) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
final DataInputView input = new DataInputViewStreamWrapper(bais);
@@ -157,17 +195,16 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedA
// ------------------------------------------------------------------------
// Checkpointing
// ------------------------------------------------------------------------
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.numElementsEmitted;
- }
@Override
- public void restoreState(Integer state) {
- this.numElementsToSkip = state;
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ Preconditions.checkState(this.checkpointedState != null,
+ "The " + getClass().getSimpleName() + " has not been properly initialized.");
+
+ this.checkpointedState.clear();
+ this.checkpointedState.add(this.numElementsEmitted);
}
-
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d24833db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index 41bd381..01540da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -26,9 +26,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.types.Value;
import org.apache.flink.util.ExceptionUtils;
@@ -141,9 +143,12 @@ public class FromElementsFunctionTest {
data.add(i);
}
- final FromElementsFunction<Integer> source = new FromElementsFunction<Integer>(IntSerializer.INSTANCE, data);
- final FromElementsFunction<Integer> sourceCopy = CommonTestUtils.createCopySerializable(source);
-
+ final FromElementsFunction<Integer> source = new FromElementsFunction<>(IntSerializer.INSTANCE, data);
+ StreamSource<Integer, FromElementsFunction<Integer>> src = new StreamSource<>(source);
+ AbstractStreamOperatorTestHarness<Integer> testHarness =
+ new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+ testHarness.open();
+
final SourceFunction.SourceContext<Integer> ctx = new ListSourceContext<Integer>(result, 2L);
final Throwable[] error = new Throwable[1];
@@ -166,11 +171,10 @@ public class FromElementsFunctionTest {
Thread.sleep(1000);
// make a checkpoint
- int count;
- List<Integer> checkpointData = new ArrayList<Integer>(NUM_ELEMENTS);
-
+ List<Integer> checkpointData = new ArrayList<>(NUM_ELEMENTS);
+ OperatorStateHandles handles = null;
synchronized (ctx.getCheckpointLock()) {
- count = source.snapshotState(566, System.currentTimeMillis());
+ handles = testHarness.snapshot(566, System.currentTimeMillis());
checkpointData.addAll(result);
}
@@ -184,11 +188,18 @@ public class FromElementsFunctionTest {
error[0].printStackTrace();
fail("Error in asynchronous source runner");
}
-
+
+ final FromElementsFunction<Integer> sourceCopy = new FromElementsFunction<>(IntSerializer.INSTANCE, data);
+ StreamSource<Integer, FromElementsFunction<Integer>> srcCopy = new StreamSource<>(sourceCopy);
+ AbstractStreamOperatorTestHarness<Integer> testHarnessCopy =
+ new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
+ testHarnessCopy.setup();
+ testHarnessCopy.initializeState(handles);
+ testHarnessCopy.open();
+
// recovery run
- SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<Integer>(checkpointData);
- sourceCopy.restoreState(count);
-
+ SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<>(checkpointData);
+
sourceCopy.run(newCtx);
assertEquals(data, checkpointData);