You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:40:08 UTC
[12/18] flink git commit: [FLINK-5113] Port functions in tests to new
CheckpointedFunction IF.
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
new file mode 100644
index 0000000..60253fa
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+/**
+ * This test is the same as the {@link CheckpointedStreamingProgram} but using the
+ * old and deprecated {@link Checkpointed} interface. It stays here in order to
+ * guarantee that although deprecated, the old Checkpointed interface is still supported.
+ * This is necessary to not break user code.
+ * */
+public class LegacyCheckpointedStreamingProgram {
+
+ private static final int CHECKPOINT_INTERVALL = 100;
+
+ public static void main(String[] args) throws Exception {
+ final String jarFile = args[0];
+ final String host = args[1];
+ final int port = Integer.parseInt(args[2]);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(CHECKPOINT_INTERVALL);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+ env.disableOperatorChaining();
+
+ DataStream<String> text = env.addSource(new SimpleStringGenerator());
+ text.map(new StatefulMapper()).addSink(new NoOpSink());
+ env.setParallelism(1);
+ env.execute("Checkpointed Streaming Program");
+ }
+
+
+ // with Checkpointing
+ public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
+
+ private static final long serialVersionUID = 3700033137820808611L;
+
+ public boolean running = true;
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ while(running) {
+ Thread.sleep(1);
+ ctx.collect("someString");
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+
+ }
+ }
+
+ public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
+
+ private static final long serialVersionUID = 2703630582894634440L;
+
+ private String someState;
+ private boolean atLeastOneSnapshotComplete = false;
+ private boolean restored = false;
+
+ @Override
+ public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return this;
+ }
+
+ @Override
+ public void restoreState(StatefulMapper state) {
+ restored = true;
+ this.someState = state.someState;
+ this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete;
+ }
+
+ @Override
+ public String map(String value) throws Exception {
+ if(!atLeastOneSnapshotComplete) {
+ // throttle consumption by the checkpoint interval until we have one snapshot.
+ Thread.sleep(CHECKPOINT_INTERVALL);
+ }
+ if(atLeastOneSnapshotComplete && !restored) {
+ throw new RuntimeException("Intended failure, to trigger restore");
+ }
+ if(restored) {
+ throw new SuccessException();
+ //throw new RuntimeException("All good");
+ }
+ someState = value; // update our state
+ return value;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ atLeastOneSnapshotComplete = true;
+ }
+ }
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * We intentionally use a user specified failure exception
+ */
+ public static class SuccessException extends Exception {
+
+ private static final long serialVersionUID = 7073311460437532086L;
+ }
+
+ public static class NoOpSink implements SinkFunction<String> {
+ private static final long serialVersionUID = 2381410324190818620L;
+
+ @Override
+ public void invoke(String value) throws Exception {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index 4d10bf1..bba218f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -63,6 +63,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -383,7 +384,7 @@ public class ChaosMonkeyITCase extends TestLogger {
}
public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
- implements Checkpointed<Long>, CheckpointListener {
+ implements ListCheckpointed<Long>, CheckpointListener {
private static final long serialVersionUID = 0L;
@@ -426,18 +427,20 @@ public class ChaosMonkeyITCase extends TestLogger {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
LOG.info("Snapshotting state {} @ ID {}.", current, checkpointId);
- return current;
+ return Collections.singletonList(this.current);
}
@Override
- public void restoreState(Long state) {
- LOG.info("Restoring state {}/{}", state, end);
- current = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ LOG.info("Restoring state {}/{}", state.get(0), end);
+ this.current = state.get(0);
}
-
@Override
public void cancel() {
isRunning = false;
@@ -453,7 +456,7 @@ public class ChaosMonkeyITCase extends TestLogger {
}
public static class CountingSink extends RichSinkFunction<Long>
- implements Checkpointed<CountingSink>, CheckpointListener {
+ implements ListCheckpointed<CountingSink>, CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
@@ -467,7 +470,6 @@ public class ChaosMonkeyITCase extends TestLogger {
private int numberOfReceivedLastElements;
-
public CountingSink(int parallelism, long expectedFinalCount) {
this.expectedFinalCount = expectedFinalCount;
this.parallelism = parallelism;
@@ -496,16 +498,20 @@ public class ChaosMonkeyITCase extends TestLogger {
}
@Override
- public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public List<CountingSink> snapshotState(long checkpointId, long timestamp) throws Exception {
LOG.info("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId);
- return this;
+ return Collections.singletonList(this);
}
@Override
- public void restoreState(CountingSink state) {
- LOG.info("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements);
- this.current = state.current;
- this.numberOfReceivedLastElements = state.numberOfReceivedLastElements;
+ public void restoreState(List<CountingSink> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ CountingSink sink = state.get(0);
+ this.current = sink.current;
+ this.numberOfReceivedLastElements = sink.numberOfReceivedLastElements;
+ LOG.info("Restoring state {}:{}", sink.current, sink.numberOfReceivedLastElements);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 418aa51..60a3a62 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -63,6 +63,8 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -460,7 +462,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
* A checkpointed source, which emits elements from 0 to a configured number.
*/
public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
@@ -500,22 +502,26 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
LOG.debug("Snapshotting state {} @ ID {}.", current, checkpointId);
- return current;
+ return Collections.singletonList(this.current);
}
@Override
- public void restoreState(Long state) {
- LOG.debug("Restoring state {}", state);
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ Long s = state.get(0);
+ LOG.debug("Restoring state {}", s);
// This is necessary to make sure that something is recovered at all. Otherwise it
// might happen that the job is restarted from the beginning.
- RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), state);
+ RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s);
sync.countDown();
- current = state;
+ current = s;
}
@Override
@@ -528,7 +534,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
* A checkpointed sink, which sums up its input and notifies the main thread after all inputs
* are exhausted.
*/
- public static class CountingSink implements SinkFunction<Long>, Checkpointed<CountingSink>,
+ public static class CountingSink implements SinkFunction<Long>, ListCheckpointed<CountingSink>,
CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
@@ -558,16 +564,21 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
}
@Override
- public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public List<CountingSink> snapshotState(long checkpointId, long timestamp) throws Exception {
LOG.debug("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId);
- return this;
+ return Collections.singletonList(this);
}
@Override
- public void restoreState(CountingSink state) {
- LOG.debug("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements);
- this.current = state.current;
- this.numberOfReceivedLastElements = state.numberOfReceivedLastElements;
+ public void restoreState(List<CountingSink> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ CountingSink s = state.get(0);
+ LOG.debug("Restoring state {}:{}", s.current, s.numberOfReceivedLastElements);
+
+ this.current = s.current;
+ this.numberOfReceivedLastElements = s.numberOfReceivedLastElements;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index fcc3d42..987a586 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -20,6 +20,8 @@ package org.apache.flink.test.recovery;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
@@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -106,7 +108,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
}
public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
private static final long SLEEP_TIME = 50;
@@ -160,13 +162,16 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return collected;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.collected);
}
@Override
- public void restoreState(Long state) {
- collected = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.collected = state.get(0);
}
}
@@ -189,7 +194,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
}
}
- private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> {
+ private static class CheckpointedSink extends RichSinkFunction<Long> implements ListCheckpointed<Long> {
private long stepSize;
private long congruence;
@@ -223,13 +228,16 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return collected;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.collected);
}
@Override
- public void restoreState(Long state) {
- collected = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.collected = state.get(0);
}
}
}