You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:39:33 UTC

[12/17] flink git commit: [FLINK-5113] Port functions in tests to new CheckpointedFunction IF.

http://git-wip-us.apache.org/repos/asf/flink/blob/8b069fde/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
new file mode 100644
index 0000000..60253fa
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+/**
+ * This test is the same as the {@link CheckpointedStreamingProgram} but using the
+ * old and deprecated {@link Checkpointed} interface. It stays here in order to
+ * guarantee that although deprecated, the old Checkpointed interface is still supported.
+ * This is necessary to not break user code.
+ * */
+public class LegacyCheckpointedStreamingProgram {
+
+	private static final int CHECKPOINT_INTERVALL = 100;
+
+	public static void main(String[] args) throws Exception {
+		final String jarFile = args[0];
+		final String host = args[1];
+		final int port = Integer.parseInt(args[2]);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(CHECKPOINT_INTERVALL);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+		env.disableOperatorChaining();
+
+		DataStream<String> text = env.addSource(new SimpleStringGenerator());
+		text.map(new StatefulMapper()).addSink(new NoOpSink());
+		env.setParallelism(1);
+		env.execute("Checkpointed Streaming Program");
+	}
+
+
+	// with Checkpointing
+	public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
+
+		private static final long serialVersionUID = 3700033137820808611L;
+
+		public boolean running = true;
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			while(running) {
+				Thread.sleep(1);
+				ctx.collect("someString");
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return null;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+
+		}
+	}
+
+	public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
+
+		private static final long serialVersionUID = 2703630582894634440L;
+
+		private String someState;
+		private boolean atLeastOneSnapshotComplete = false;
+		private boolean restored = false;
+
+		@Override
+		public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return this;
+		}
+
+		@Override
+		public void restoreState(StatefulMapper state) {
+			restored = true;
+			this.someState = state.someState;
+			this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			if(!atLeastOneSnapshotComplete) {
+				// throttle consumption by the checkpoint interval until we have one snapshot.
+				Thread.sleep(CHECKPOINT_INTERVALL);
+			}
+			if(atLeastOneSnapshotComplete && !restored) {
+				throw new RuntimeException("Intended failure, to trigger restore");
+			}
+			if(restored) {
+				throw new SuccessException();
+				//throw new RuntimeException("All good");
+			}
+			someState = value; // update our state
+			return value;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			atLeastOneSnapshotComplete = true;
+		}
+	}
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * We intentionally use a user specified failure exception
+	 */
+	public static class SuccessException extends Exception {
+
+		private static final long serialVersionUID = 7073311460437532086L;
+	}
+
+	public static class NoOpSink implements SinkFunction<String> {
+		private static final long serialVersionUID = 2381410324190818620L;
+
+		@Override
+		public void invoke(String value) throws Exception {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b069fde/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index 4d10bf1..bba218f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -63,6 +63,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -383,7 +384,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 	}
 
 	public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
-			implements Checkpointed<Long>, CheckpointListener {
+			implements ListCheckpointed<Long>, CheckpointListener {
 
 		private static final long serialVersionUID = 0L;
 
@@ -426,18 +427,20 @@ public class ChaosMonkeyITCase extends TestLogger {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
 			LOG.info("Snapshotting state {} @ ID {}.", current, checkpointId);
-			return current;
+			return Collections.singletonList(this.current);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			LOG.info("Restoring state {}/{}", state, end);
-			current = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			LOG.info("Restoring state {}/{}", state.get(0), end);
+			this.current = state.get(0);
 		}
 
-
 		@Override
 		public void cancel() {
 			isRunning = false;
@@ -453,7 +456,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 	}
 
 	public static class CountingSink extends RichSinkFunction<Long>
-			implements Checkpointed<CountingSink>, CheckpointListener {
+			implements ListCheckpointed<CountingSink>, CheckpointListener {
 
 		private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
 
@@ -467,7 +470,6 @@ public class ChaosMonkeyITCase extends TestLogger {
 
 		private int numberOfReceivedLastElements;
 
-
 		public CountingSink(int parallelism, long expectedFinalCount) {
 			this.expectedFinalCount = expectedFinalCount;
 			this.parallelism = parallelism;
@@ -496,16 +498,20 @@ public class ChaosMonkeyITCase extends TestLogger {
 		}
 
 		@Override
-		public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public List<CountingSink> snapshotState(long checkpointId, long timestamp) throws Exception {
 			LOG.info("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId);
-			return this;
+			return Collections.singletonList(this);
 		}
 
 		@Override
-		public void restoreState(CountingSink state) {
-			LOG.info("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements);
-			this.current = state.current;
-			this.numberOfReceivedLastElements = state.numberOfReceivedLastElements;
+		public void restoreState(List<CountingSink> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			CountingSink sink = state.get(0);
+			this.current = sink.current;
+			this.numberOfReceivedLastElements = sink.numberOfReceivedLastElements;
+			LOG.info("Restoring state {}:{}", sink.current, sink.numberOfReceivedLastElements);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b069fde/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 3f08b5a..f7472e1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -63,6 +63,8 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -460,7 +462,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 	 * A checkpointed source, which emits elements from 0 to a configured number.
 	 */
 	public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 
 		private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
 
@@ -500,22 +502,26 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
 			LOG.debug("Snapshotting state {} @ ID {}.", current, checkpointId);
-			return current;
+			return Collections.singletonList(this.current);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			LOG.debug("Restoring state {}", state);
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			Long s = state.get(0);
+			LOG.debug("Restoring state {}", s);
 
 			// This is necessary to make sure that something is recovered at all. Otherwise it
 			// might happen that the job is restarted from the beginning.
-			RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), state);
+			RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s);
 
 			sync.countDown();
 
-			current = state;
+			current = s;
 		}
 
 		@Override
@@ -528,7 +534,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 	 * A checkpointed sink, which sums up its input and notifies the main thread after all inputs
 	 * are exhausted.
 	 */
-	public static class CountingSink implements SinkFunction<Long>, Checkpointed<CountingSink>,
+	public static class CountingSink implements SinkFunction<Long>, ListCheckpointed<CountingSink>,
 		CheckpointListener {
 
 		private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
@@ -558,16 +564,21 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		}
 
 		@Override
-		public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public List<CountingSink> snapshotState(long checkpointId, long timestamp) throws Exception {
 			LOG.debug("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId);
-			return this;
+			return Collections.singletonList(this);
 		}
 
 		@Override
-		public void restoreState(CountingSink state) {
-			LOG.debug("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements);
-			this.current = state.current;
-			this.numberOfReceivedLastElements = state.numberOfReceivedLastElements;
+		public void restoreState(List<CountingSink> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			CountingSink s = state.get(0);
+			LOG.debug("Restoring state {}:{}", s.current, s.numberOfReceivedLastElements);
+
+			this.current = s.current;
+			this.numberOfReceivedLastElements = s.numberOfReceivedLastElements;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b069fde/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index fcc3d42..987a586 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -20,6 +20,8 @@ package org.apache.flink.test.recovery;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
@@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -106,7 +108,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 	}
 
 	public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> 
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 
 		private static final long SLEEP_TIME = 50;
 
@@ -160,13 +162,16 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return collected;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.collected);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			collected = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.collected = state.get(0);
 		}
 	}
 
@@ -189,7 +194,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 		}
 	}
 
-	private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> {
+	private static class CheckpointedSink extends RichSinkFunction<Long> implements ListCheckpointed<Long> {
 
 		private long stepSize;
 		private long congruence;
@@ -223,13 +228,16 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return collected;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.collected);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			collected = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.collected = state.get(0);
 		}
 	}
 }