You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/25 19:21:36 UTC
[03/12] flink git commit: [streaming] Initial rework of the operator
state interfaces
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index c745e6c..0a26ebb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,17 +18,29 @@
package org.apache.flink.streaming.runtime.tasks;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
@@ -42,18 +54,6 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
-
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
public class SourceStreamTaskTest extends StreamTaskTestBase {
@@ -144,7 +144,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
Assert.assertEquals(NUM_ELEMENTS, outList.size());
}
- private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+ private static class MockSource extends RichSourceFunction<Tuple2<Long, Integer>> implements StateCheckpointer<Integer, Integer> {
private static final long serialVersionUID = 1;
@@ -156,6 +156,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
private volatile long lastCheckpointId = -1;
private Semaphore semaphore;
+ private OperatorState<Integer> state;
private volatile boolean isRunning = true;
@@ -164,7 +165,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
this.checkpointDelay = checkpointDelay;
this.readDelay = readDelay;
this.count = 0;
- semaphore = new Semaphore(1);
+ this.semaphore = new Semaphore(1);
}
@Override
@@ -189,32 +190,33 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
public void cancel() {
isRunning = false;
}
+
+ @Override
+ public void open(Configuration conf){
+ state = getRuntimeContext().getOperatorState(1, this);
+ }
+
@Override
- public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public Integer snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
if (!semaphore.tryAcquire()) {
Assert.fail("Concurrent invocation of snapshotState.");
- }
- int startCount = count;
- lastCheckpointId = checkpointId;
-
- long sum = 0;
- for (int i = 0; i < checkpointDelay; i++) {
- sum += new Random().nextLong();
- }
-
- if (startCount != count) {
+ } else {
+ int startCount = count;
+
+ if (startCount != count) {
+ semaphore.release();
+ // This means that next() was invoked while the snapshot was ongoing
+ Assert.fail("Count is different at start end end of snapshot.");
+ }
semaphore.release();
- // This means that next() was invoked while the snapshot was ongoing
- Assert.fail("Count is different at start end end of snapshot.");
}
- semaphore.release();
- return sum;
+ return 0;
}
@Override
- public void restoreState(Serializable state) {
-
+ public Integer restoreState(Integer stateSnapshot) {
+ return stateSnapshot;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 7713994..ea0cb94 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -20,12 +20,10 @@ package org.apache.flink.streaming.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -157,8 +155,9 @@ public class MockCoContext<IN1, IN2, OUT> {
public static <IN1, IN2, OUT> List<OUT> createAndExecute(TwoInputStreamOperator<IN1, IN2, OUT> operator,
List<IN1> input1, List<IN2> input2) {
MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
- RuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
- new ExecutionConfig());
+ StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask",
+ new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
+ new ExecutionConfig(), null);
operator.setup(mockContext.collector, runtimeContext);
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 8b5607f..adec338 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -20,12 +20,10 @@ package org.apache.flink.streaming.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -105,8 +103,9 @@ public class MockContext<IN, OUT> {
public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator,
List<IN> inputs) {
MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
- RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
- new ExecutionConfig());
+ StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask",
+ new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
+ new ExecutionConfig(), null);
operator.setup(mockContext.output, runtimeContext);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index f0eef9d..8c17eec 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,6 +18,10 @@
package org.apache.flink.test.checkpointing;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@@ -25,24 +29,19 @@ import java.util.Random;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
/**
* A simple test that runs a streaming topology with checkpointing enabled.
*
@@ -182,14 +181,14 @@ public class StreamCheckpointingITCase {
// --------------------------------------------------------------------------------------------
private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
- implements Checkpointed<Long>, ParallelSourceFunction<String> {
+ implements ParallelSourceFunction<String> {
private final long numElements;
private Random rnd;
private StringBuilder stringBuilder;
- private long index;
+ private OperatorState<Integer> index;
private int step;
private volatile boolean isRunning;
@@ -197,7 +196,7 @@ public class StreamCheckpointingITCase {
static final long[] counts = new long[PARALLELISM];
@Override
public void close() {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = index.getState();
}
@@ -211,9 +210,9 @@ public class StreamCheckpointingITCase {
stringBuilder = new StringBuilder();
step = getRuntimeContext().getNumberOfParallelSubtasks();
- if (index == 0) {
- index = getRuntimeContext().getIndexOfThisSubtask();
- }
+
+ index = getRuntimeContext().getOperatorState(getRuntimeContext().getIndexOfThisSubtask());
+
isRunning = true;
}
@@ -221,8 +220,8 @@ public class StreamCheckpointingITCase {
public void run(SourceContext<String> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();
- while (isRunning && index < numElements) {
- char first = (char) ((index % 40) + 40);
+ while (isRunning && index.getState() < numElements) {
+ char first = (char) ((index.getState() % 40) + 40);
stringBuilder.setLength(0);
stringBuilder.append(first);
@@ -230,7 +229,7 @@ public class StreamCheckpointingITCase {
String result = randomString(stringBuilder, rnd);
synchronized (lockingObject) {
- index += step;
+ index.updateState(index.getState() + step);
ctx.collect(result);
}
}
@@ -241,16 +240,6 @@ public class StreamCheckpointingITCase {
isRunning = false;
}
- @Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.index;
- }
-
- @Override
- public void restoreState(Long state) {
- this.index = state;
- }
-
private static String randomString(StringBuilder bld, Random rnd) {
final int len = rnd.nextInt(10) + 5;
@@ -263,35 +252,27 @@ public class StreamCheckpointingITCase {
}
}
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<Long> {
-
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+ private OperatorState<Long> count;
+ static final long[] counts = new long[PARALLELISM];
@Override
public PrefixCount map(PrefixCount value) throws Exception {
- count++;
+ count.updateState(count.getState() + 1);
return value;
}
- static final long[] counts = new long[PARALLELISM];
-
- private long count = 0;
-
- @Override
- public void close() {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
- }
-
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public void open(Configuration conf) {
+ count = getRuntimeContext().getOperatorState(0L);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void close() {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
}
+
}
private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> {
@@ -353,60 +334,47 @@ public class StreamCheckpointingITCase {
}
}
- private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+ private static class StringRichFilterFunction extends RichFilterFunction<String> {
+ OperatorState<Long> count;
+ static final long[] counts = new long[PARALLELISM];
+
@Override
public boolean filter(String value) {
- count++;
+ count.updateState(count.getState() + 1);
return value.length() < 100;
}
-
- static final long[] counts = new long[PARALLELISM];
-
- private long count = 0;
-
- @Override
- public void close() {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
- }
-
+
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public void open(Configuration conf) {
+ this.count = getRuntimeContext().getOperatorState(0L);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void close() {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
}
}
- private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-
+ private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> {
+ OperatorState<Long> count;
+ static final long[] counts = new long[PARALLELISM];
+
@Override
public PrefixCount map(String value) {
- count++;
+ count.updateState(count.getState() + 1);
return new PrefixCount(value.substring(0, 1), value, 1L);
}
-
- static final long[] counts = new long[PARALLELISM];
-
- private long count = 0;
-
- @Override
- public void close() {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
- }
-
+
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public void open(Configuration conf) {
+ this.count = getRuntimeContext().getOperatorState(0L);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void close() {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index f153eb7..6003205 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.recovery;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@@ -30,10 +29,10 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -84,7 +83,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
}).startNewChain()
// populate the coordinate directory so we can proceed to TaskManager failure
- .map(new StatefulMapper(coordinateDir));
+ .map(new Mapper(coordinateDir));
//write result to temporary file
result.addSink(new CheckpointedSink(DATA_COUNT));
@@ -105,18 +104,16 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
}
- public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long>
- implements Checkpointed<Long> {
- private static final long serialVersionUID = 1L;
+ public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
private static final long SLEEP_TIME = 50;
private final File coordinateDir;
private final long end;
- private long collected;
-
private volatile boolean isRunning = true;
+
+ private OperatorState<Long> collected;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
@@ -136,7 +133,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
- while (isRunning && collected < toCollect) {
+ while (isRunning && collected.getState() < toCollect) {
// check if the proceed file exists (then we go full speed)
// if not, we always recheck and sleep
if (checkForProceedFile) {
@@ -149,34 +146,28 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
synchronized (checkpointLock) {
- sourceCtx.collect(collected * stepSize + congruence);
- collected++;
+ sourceCtx.collect(collected.getState() * stepSize + congruence);
+ collected.updateState(collected.getState() + 1);
}
}
}
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
+
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return collected;
+ public void open(Configuration conf) {
+ collected = getRuntimeContext().getOperatorState(0L);
}
@Override
- public void restoreState(Long state) {
- collected = state;
+ public void cancel() {
+ isRunning = false;
}
}
- public static class StatefulMapper extends RichMapFunction<Long, Long> implements
- Checkpointed<Integer> {
+
+ public static class Mapper extends RichMapFunction<Long, Long> {
private boolean markerCreated = false;
private File coordinateDir;
- private boolean restored = false;
- public StatefulMapper(File coordinateDir) {
+ public Mapper(File coordinateDir) {
this.coordinateDir = coordinateDir;
}
@@ -189,31 +180,14 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
return value;
}
-
- @Override
- public void close() {
- if (!restored) {
- fail();
- }
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return 1;
- }
-
- @Override
- public void restoreState(Integer state) {
- restored = true;
- }
}
- private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> {
+ private static class CheckpointedSink extends RichSinkFunction<Long> {
private long stepSize;
private long congruence;
private long toCollect;
- private long collected = 0L;
+ private OperatorState<Long> collected;
private long end;
public CheckpointedSink(long end) {
@@ -225,30 +199,21 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
congruence = getRuntimeContext().getIndexOfThisSubtask();
toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
+ collected = getRuntimeContext().getOperatorState(0L);
}
@Override
public void invoke(Long value) throws Exception {
- long expected = collected * stepSize + congruence;
+ long expected = collected.getState() * stepSize + congruence;
Assert.assertTrue("Value did not match expected value. " + expected + " != " + value, value.equals(expected));
- collected++;
+ collected.updateState(collected.getState() + 1);
- if (collected > toCollect) {
+ if (collected.getState() > toCollect) {
Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect);
}
}
-
- @Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return collected;
- }
-
- @Override
- public void restoreState(Long state) {
- collected = state;
- }
}
}