You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/13 12:44:19 UTC

[2/4] flink git commit: [FLINK-5163] Port the FromElementsFunction to the new state abstractions.

[FLINK-5163] Port the FromElementsFunction to the new state abstractions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d24833db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d24833db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d24833db

Branch: refs/heads/master
Commit: d24833dbb641979008ba18be8b2ddb694a5d43e1
Parents: 685c4f8
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 17 16:52:50 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 13 13:38:18 2016 +0100

----------------------------------------------------------------------
 .../functions/source/FromElementsFunction.java  | 61 ++++++++++++++++----
 .../api/functions/FromElementsFunctionTest.java | 35 +++++++----
 2 files changed, 72 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d24833db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 98bc10e..e3a9d54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -18,17 +18,25 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * A stream source function that returns a sequence of elements.
@@ -36,11 +44,14 @@ import java.util.Collection;
  * <p>Upon construction, this source function serializes the elements using Flink's type information.
  * That way, any object transport using Java serialization will not be affected by the serializability
  * of the elements.</p>
- * 
+ *
+ * <p>
+ * <b>NOTE:</b> This source has a parallelism of 1.
+ *
  * @param <T> The type of elements returned by this function.
  */
 @PublicEvolving
-public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedAsynchronously<Integer> {
+public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedFunction {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -62,7 +73,8 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedA
 	/** Flag to make the source cancelable */
 	private volatile boolean isRunning = true;
 
-	
+	private transient ListState<Integer> checkpointedState;
+
 	public FromElementsFunction(TypeSerializer<T> serializer, T... elements) throws IOException {
 		this(serializer, Arrays.asList(elements));
 	}
@@ -88,6 +100,32 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedA
 	}
 
 	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		Preconditions.checkState(this.checkpointedState == null,
+			"The " + getClass().getSimpleName() + " has already been initialized.");
+
+		this.checkpointedState = context.getOperatorStateStore().getOperatorState(
+			new ListStateDescriptor<>(
+				"from-elements-state",
+				IntSerializer.INSTANCE
+			)
+		);
+
+		if (context.isRestored()) {
+			List<Integer> retrievedStates = new ArrayList<>();
+			for (Integer entry : this.checkpointedState.get()) {
+				retrievedStates.add(entry);
+			}
+
+			// given that the parallelism of the function is 1, we can only have 1 state
+			Preconditions.checkArgument(retrievedStates.size() == 1,
+				getClass().getSimpleName() + " retrieved invalid state.");
+
+			this.numElementsToSkip = retrievedStates.get(0);
+		}
+	}
+
+	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
 		ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
 		final DataInputView input = new DataInputViewStreamWrapper(bais);
@@ -157,17 +195,16 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedA
 	// ------------------------------------------------------------------------
 	//  Checkpointing
 	// ------------------------------------------------------------------------
-	
-	@Override
-	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-		return this.numElementsEmitted;
-	}
 
 	@Override
-	public void restoreState(Integer state) {
-		this.numElementsToSkip = state;
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		Preconditions.checkState(this.checkpointedState != null,
+			"The " + getClass().getSimpleName() + " has not been properly initialized.");
+
+		this.checkpointedState.clear();
+		this.checkpointedState.add(this.numElementsEmitted);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d24833db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index 41bd381..01540da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -26,9 +26,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -141,9 +143,12 @@ public class FromElementsFunctionTest {
 				data.add(i);
 			}
 			
-			final FromElementsFunction<Integer> source = new FromElementsFunction<Integer>(IntSerializer.INSTANCE, data);
-			final FromElementsFunction<Integer> sourceCopy = CommonTestUtils.createCopySerializable(source);
-			
+			final FromElementsFunction<Integer> source = new FromElementsFunction<>(IntSerializer.INSTANCE, data);
+			StreamSource<Integer, FromElementsFunction<Integer>> src = new StreamSource<>(source);
+			AbstractStreamOperatorTestHarness<Integer> testHarness =
+				new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+			testHarness.open();
+
 			final SourceFunction.SourceContext<Integer> ctx = new ListSourceContext<Integer>(result, 2L);
 			
 			final Throwable[] error = new Throwable[1];
@@ -166,11 +171,10 @@ public class FromElementsFunctionTest {
 			Thread.sleep(1000);
 			
 			// make a checkpoint
-			int count;
-			List<Integer> checkpointData = new ArrayList<Integer>(NUM_ELEMENTS);
-			
+			List<Integer> checkpointData = new ArrayList<>(NUM_ELEMENTS);
+			OperatorStateHandles handles = null;
 			synchronized (ctx.getCheckpointLock()) {
-				count = source.snapshotState(566, System.currentTimeMillis());
+				handles = testHarness.snapshot(566, System.currentTimeMillis());
 				checkpointData.addAll(result);
 			}
 			
@@ -184,11 +188,18 @@ public class FromElementsFunctionTest {
 				error[0].printStackTrace();
 				fail("Error in asynchronous source runner");
 			}
-			
+
+			final FromElementsFunction<Integer> sourceCopy = new FromElementsFunction<>(IntSerializer.INSTANCE, data);
+			StreamSource<Integer, FromElementsFunction<Integer>> srcCopy = new StreamSource<>(sourceCopy);
+			AbstractStreamOperatorTestHarness<Integer> testHarnessCopy =
+				new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
+			testHarnessCopy.setup();
+			testHarnessCopy.initializeState(handles);
+			testHarnessCopy.open();
+
 			// recovery run
-			SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<Integer>(checkpointData);
-			sourceCopy.restoreState(count);
-			
+			SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<>(checkpointData);
+
 			sourceCopy.run(newCtx);
 			
 			assertEquals(data, checkpointData);