You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:47:52 UTC

[02/10] flink git commit: [FLINK-4379] [checkpoints] Introduce rescalable operator state

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 2036f69..f638ddd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -317,7 +318,7 @@ public class StreamMockEnvironment implements Environment {
 	@Override
 	public void acknowledgeCheckpoint(
 			long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+			CheckpointStateHandles checkpointStateHandles,
 			long synchronousDurationMillis, long asynchronousDurationMillis,
 			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 430c6de..247edd6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -24,12 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
@@ -41,11 +43,12 @@ import java.util.Collections;
 import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get
- * a {@link KeyedStateBackend}.
+ * a {@link AbstractKeyedStateBackend}.
  *
  */
 public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@@ -53,7 +56,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 
 	// in case the operator creates one we store it here so that we
 	// can snapshot its state
-	private KeyedStateBackend<?> keyedStateBackend = null;
+	private AbstractKeyedStateBackend<?> keyedStateBackend = null;
 
 	// when we restore we keep the state here so that we can call restore
 	// when the operator requests the keyed state backend
@@ -114,7 +117,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 					final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
 
 					if(keyedStateBackend != null) {
-						keyedStateBackend.close();
+						keyedStateBackend.dispose();
 					}
 
 					if (restoredKeyedState == null) {
@@ -148,7 +151,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream, long, long)} ()}
+	 *
 	 */
 	@Override
 	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
@@ -159,7 +162,9 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		CheckpointStreamFactory.CheckpointStateOutputStream outStream =
 				streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
 
-		operator.snapshotState(outStream, checkpointId, timestamp);
+		if (operator instanceof StreamCheckpointedOperator) {
+			((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+		}
 
 		if (keyedStateBackend != null) {
 			RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
@@ -180,17 +185,21 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
+	 * 
 	 */
 	@Override
 	public void restore(StreamStateHandle snapshot) throws Exception {
-		FSDataInputStream inStream = snapshot.openInputStream();
-		operator.restoreState(inStream);
+		try (FSDataInputStream inStream = snapshot.openInputStream()) {
+
+			if (operator instanceof StreamCheckpointedOperator) {
+				((StreamCheckpointedOperator) operator).restoreState(inStream);
+			}
 
-		byte keyedStatePresent = (byte) inStream.read();
-		if (keyedStatePresent == 1) {
-			ObjectInputStream ois = new ObjectInputStream(inStream);
-			this.restoredKeyedState = (KeyGroupsStateHandle) ois.readObject();
+			byte keyedStatePresent = (byte) inStream.read();
+			if (keyedStatePresent == 1) {
+				ObjectInputStream ois = new ObjectInputStream(inStream);
+				this.restoredKeyedState = (KeyGroupsStateHandle) ois.readObject();
+			}
 		}
 	}
 
@@ -200,7 +209,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	public void close() throws Exception {
 		super.close();
 		if(keyedStateBackend != null) {
-			keyedStateBackend.close();
+			keyedStateBackend.dispose();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index acf046a..d6f46fd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -39,7 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -204,14 +204,18 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(FSDataOutputStream, long, long)} ()}
+	 *
 	 */
 	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
 		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
 				new JobID(),
 				"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-		operator.snapshotState(outStream, checkpointId, timestamp);
-		return outStream.closeAndGetHandle();
+		if(operator instanceof StreamCheckpointedOperator) {
+			((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+			return outStream.closeAndGetHandle();
+		} else {
+			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+		}
 	}
 
 	/**
@@ -222,10 +226,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
+	 *
 	 */
 	public void restore(StreamStateHandle snapshot) throws Exception {
-		operator.restoreState(snapshot.openInputStream());
+		if(operator instanceof StreamCheckpointedOperator) {
+			try (FSDataInputStream in = snapshot.openInputStream()) {
+				((StreamCheckpointedOperator) operator).restoreState(in);
+			}
+		} else {
+			throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index c12bcb9..5874f56 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -35,6 +36,8 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.junit.Assert;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 
@@ -180,7 +183,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 	 */
 	private static class OnceFailingIdentityMapFunction
 			extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> 
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 
 		private static volatile boolean hasFailed = false;
 
@@ -211,15 +214,16 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 			return value;
 		}
 
-
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if(!state.isEmpty()) {
+				count = state.get(0);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 694f006..2a635ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -21,17 +21,18 @@ package org.apache.flink.test.streaming.runtime;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.junit.Test;
@@ -66,7 +67,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 				@Override
 				public void open(Configuration parameters) throws Exception {
 					super.open(parameters);
-					getRuntimeContext().getKeyValueState("test", String.class, "");
+					getRuntimeContext().getState(new ValueStateDescriptor<Integer>("Test", Integer.class, 0));
 				}
 
 				@Override
@@ -99,7 +100,8 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public <K> KeyedStateBackend<K> createKeyedStateBackend(Environment env,
+		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+				Environment env,
 				JobID jobID,
 				String operatorIdentifier,
 				TypeSerializer<K> keySerializer,
@@ -110,7 +112,8 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env,
+		public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
+				Environment env,
 				JobID jobID,
 				String operatorIdentifier,
 				TypeSerializer<K> keySerializer,