You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:47:52 UTC
[02/10] flink git commit: [FLINK-4379] [checkpoints] Introduce
rescalable operator state
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 2036f69..f638ddd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -317,7 +318,7 @@ public class StreamMockEnvironment implements Environment {
@Override
public void acknowledgeCheckpoint(
long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+ CheckpointStateHandles checkpointStateHandles,
long synchronousDurationMillis, long asynchronousDurationMillis,
long bytesBufferedInAlignment, long alignmentDurationNanos) {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 430c6de..247edd6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -24,12 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.mockito.invocation.InvocationOnMock;
@@ -41,11 +43,12 @@ import java.util.Collections;
import java.util.concurrent.RunnableFuture;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
/**
* Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get
- * a {@link KeyedStateBackend}.
+ * a {@link AbstractKeyedStateBackend}.
*
*/
public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@@ -53,7 +56,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
// in case the operator creates one we store it here so that we
// can snapshot its state
- private KeyedStateBackend<?> keyedStateBackend = null;
+ private AbstractKeyedStateBackend<?> keyedStateBackend = null;
// when we restore we keep the state here so that we can call restore
// when the operator requests the keyed state backend
@@ -114,7 +117,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
if(keyedStateBackend != null) {
- keyedStateBackend.close();
+ keyedStateBackend.dispose();
}
if (restoredKeyedState == null) {
@@ -148,7 +151,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream, long, long)} ()}
+ *
*/
@Override
public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
@@ -159,7 +162,9 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
- operator.snapshotState(outStream, checkpointId, timestamp);
+ if (operator instanceof StreamCheckpointedOperator) {
+ ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+ }
if (keyedStateBackend != null) {
RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
@@ -180,17 +185,21 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
+ *
*/
@Override
public void restore(StreamStateHandle snapshot) throws Exception {
- FSDataInputStream inStream = snapshot.openInputStream();
- operator.restoreState(inStream);
+ try (FSDataInputStream inStream = snapshot.openInputStream()) {
+
+ if (operator instanceof StreamCheckpointedOperator) {
+ ((StreamCheckpointedOperator) operator).restoreState(inStream);
+ }
- byte keyedStatePresent = (byte) inStream.read();
- if (keyedStatePresent == 1) {
- ObjectInputStream ois = new ObjectInputStream(inStream);
- this.restoredKeyedState = (KeyGroupsStateHandle) ois.readObject();
+ byte keyedStatePresent = (byte) inStream.read();
+ if (keyedStatePresent == 1) {
+ ObjectInputStream ois = new ObjectInputStream(inStream);
+ this.restoredKeyedState = (KeyGroupsStateHandle) ois.readObject();
+ }
}
}
@@ -200,7 +209,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
public void close() throws Exception {
super.close();
if(keyedStateBackend != null) {
- keyedStateBackend.close();
+ keyedStateBackend.dispose();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index acf046a..d6f46fd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@@ -39,7 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -204,14 +204,18 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(FSDataOutputStream, long, long)} ()}
+ *
*/
public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
new JobID(),
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
- operator.snapshotState(outStream, checkpointId, timestamp);
- return outStream.closeAndGetHandle();
+ if(operator instanceof StreamCheckpointedOperator) {
+ ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+ return outStream.closeAndGetHandle();
+ } else {
+ throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+ }
}
/**
@@ -222,10 +226,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
+ *
*/
public void restore(StreamStateHandle snapshot) throws Exception {
- operator.restoreState(snapshot.openInputStream());
+ if(operator instanceof StreamCheckpointedOperator) {
+ try (FSDataInputStream in = snapshot.openInputStream()) {
+ ((StreamCheckpointedOperator) operator).restoreState(in);
+ }
+ } else {
+ throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index c12bcb9..5874f56 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -35,6 +36,8 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.junit.Assert;
+import java.util.Collections;
+import java.util.List;
import java.util.Queue;
import java.util.Random;
@@ -180,7 +183,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
*/
private static class OnceFailingIdentityMapFunction
extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
private static volatile boolean hasFailed = false;
@@ -211,15 +214,16 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
return value;
}
-
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if(!state.isEmpty()) {
+ count = state.get(0);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 694f006..2a635ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -21,17 +21,18 @@ package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Test;
@@ -66,7 +67,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- getRuntimeContext().getKeyValueState("test", String.class, "");
+ getRuntimeContext().getState(new ValueStateDescriptor<Integer>("Test", Integer.class, 0));
}
@Override
@@ -99,7 +100,8 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
}
@Override
- public <K> KeyedStateBackend<K> createKeyedStateBackend(Environment env,
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
@@ -110,7 +112,8 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
}
@Override
- public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env,
+ public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
+ Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,