You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:53 UTC
[02/47] flink git commit: [FLINK-2354] [runtime] Add job graph and
checkpoint recovery
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
deleted file mode 100644
index f517f83..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * A simple test that runs a streaming topology with checkpointing enabled.
- *
- * The test triggers a failure after a while and verifies that, after completion, the
- * state defined with either the {@link OperatorState} or the {@link Checkpointed}
- * interface reflects the "exactly once" semantics.
- *
- * The test throttles the input until at least two checkpoints are completed, to make sure that
- * the recovery does not fall back to "square one" (which would naturally lead to correct
- * results without testing the checkpointing).
- */
-@SuppressWarnings("serial")
-public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(StateCheckpoinedITCase.class);
-
- final long NUM_STRINGS = 10_000_000L;
-
- /**
- * Runs the following program:
- *
- * <pre>
- * [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
- * </pre>
- */
- @Override
- public void testProgram(StreamExecutionEnvironment env) {
- assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
- final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
- final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
-
- final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-
- env.enableCheckpointing(200);
-
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-
- stream
- // first vertex, chained to the source
- // this filter throttles the flow until at least one checkpoint
- // is complete, to make sure this program does not run without
- .filter(new StringRichFilterFunction())
-
- // -------------- seconds vertex - one-to-one connected ----------------
- .map(new StringPrefixCountRichMapFunction())
- .startNewChain()
- .map(new StatefulCounterFunction())
-
- // -------------- third vertex - reducer and the sink ----------------
- .partitionByHash("prefix")
- .flatMap(new OnceFailingAggregator(failurePos))
- .addSink(new ValidatingSink());
- }
-
- @Override
- public void postSubmit() {
-
- //assertTrue("Test inconclusive: failure occurred before first checkpoint",
- // OnceFailingAggregator.wasCheckpointedBeforeFailure);
- if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
- LOG.warn("Test inconclusive: failure occurred before first checkpoint");
- }
-
- long filterSum = 0;
- for (long l : StringRichFilterFunction.counts) {
- filterSum += l;
- }
-
- long mapSum = 0;
- for (long l : StringPrefixCountRichMapFunction.counts) {
- mapSum += l;
- }
-
- long countSum = 0;
- for (long l : StatefulCounterFunction.counts) {
- countSum += l;
- }
-
- // verify that we counted exactly right
- assertEquals(NUM_STRINGS, filterSum);
- assertEquals(NUM_STRINGS, mapSum);
- assertEquals(NUM_STRINGS, countSum);
-
- for (Map<Character, Long> map : ValidatingSink.maps) {
- for (Long count : map.values()) {
- assertEquals(NUM_STRINGS / 40, count.longValue());
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom Functions
- // --------------------------------------------------------------------------------------------
-
- private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements CheckpointedAsynchronously<Integer>
- {
- private final long numElements;
-
- private int index;
-
- private volatile boolean isRunning = true;
-
-
- StringGeneratingSourceFunction(long numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- final Object lockingObject = ctx.getCheckpointLock();
-
- final Random rnd = new Random();
- final StringBuilder stringBuilder = new StringBuilder();
-
- final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
- if (index == 0) {
- index = getRuntimeContext().getIndexOfThisSubtask();
- }
-
- while (isRunning && index < numElements) {
- char first = (char) ((index % 40) + 40);
-
- stringBuilder.setLength(0);
- stringBuilder.append(first);
-
- String result = randomString(stringBuilder, rnd);
-
- synchronized (lockingObject) {
- index += step;
- ctx.collect(result);
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- private static String randomString(StringBuilder bld, Random rnd) {
- final int len = rnd.nextInt(10) + 5;
-
- for (int i = 0; i < len; i++) {
- char next = (char) (rnd.nextInt(20000) + 33);
- bld.append(next);
- }
-
- return bld.toString();
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
- }
-
- @Override
- public void restoreState(Integer state) {
- index = state;
- }
- }
-
- private static class StringRichFilterFunction extends RichFilterFunction<String>
- implements Checkpointed<Long> {
-
- static final long[] counts = new long[PARALLELISM];
-
- private long count;
-
- @Override
- public boolean filter(String value) throws Exception {
- count++;
- return value.length() < 100; // should be always true
- }
-
- @Override
- public void close() {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
- }
-
- @Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
- }
-
- @Override
- public void restoreState(Long state) {
- count = state;
- }
- }
-
- private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
- implements CheckpointedAsynchronously<Long> {
-
- static final long[] counts = new long[PARALLELISM];
-
- private long count;
-
- @Override
- public PrefixCount map(String value) {
- count++;
- return new PrefixCount(value.substring(0, 1), value, 1L);
- }
-
- @Override
- public void close() {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
- }
-
- @Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
- }
-
- @Override
- public void restoreState(Long state) {
- count = state;
- }
- }
-
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<Long> {
-
- static final long[] counts = new long[PARALLELISM];
-
- private long count;
-
- @Override
- public PrefixCount map(PrefixCount value) throws Exception {
- count++;
- return value;
- }
-
- @Override
- public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
- }
-
- @Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
- }
-
- @Override
- public void restoreState(Long state) {
- count = state;
- }
- }
-
- private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier {
-
- static boolean wasCheckpointedBeforeFailure = false;
-
- private static volatile boolean hasFailed = false;
-
- private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
-
- private long failurePos;
- private long count;
-
- private boolean wasCheckpointed;
-
-
- OnceFailingAggregator(long failurePos) {
- this.failurePos = failurePos;
- }
-
- @Override
- public void open(Configuration parameters) {
- count = 0;
- }
-
- @Override
- public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
- count++;
- if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
- wasCheckpointedBeforeFailure = wasCheckpointed;
- hasFailed = true;
- throw new Exception("Test Failure");
- }
-
- PrefixCount curr = aggregationMap.get(value.prefix);
- if (curr == null) {
- aggregationMap.put(value.prefix, value);
- out.collect(value);
- }
- else {
- curr.count += value.count;
- out.collect(curr);
- }
- }
-
- @Override
- public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
- return aggregationMap;
- }
-
- @Override
- public void restoreState(HashMap<String, PrefixCount> state) {
- aggregationMap.putAll(state);
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- this.wasCheckpointed = true;
- }
- }
-
- private static class ValidatingSink extends RichSinkFunction<PrefixCount>
- implements Checkpointed<HashMap<Character, Long>> {
-
- @SuppressWarnings("unchecked")
- private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
-
- private HashMap<Character, Long> counts = new HashMap<Character, Long>();
-
- @Override
- public void invoke(PrefixCount value) {
- Character first = value.prefix.charAt(0);
- Long previous = counts.get(first);
- if (previous == null) {
- counts.put(first, value.count);
- } else {
- counts.put(first, Math.max(previous, value.count));
- }
- }
-
- @Override
- public void close() throws Exception {
- maps[getRuntimeContext().getIndexOfThisSubtask()] = counts;
- }
-
- @Override
- public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
- return counts;
- }
-
- @Override
- public void restoreState(HashMap<Character, Long> state) {
- counts.putAll(state);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
new file mode 100644
index 0000000..d7c06f6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ *
+ * The test triggers a failure after a while and verifies that, after completion, the
+ * state defined with either the {@link OperatorState} or the {@link Checkpointed}
+ * interface reflects the "exactly once" semantics.
+ *
+ * The test throttles the input until at least two checkpoints are completed, to make sure that
+ * the recovery does not fall back to "square one" (which would naturally lead to correct
+ * results without testing the checkpointing).
+ */
+@SuppressWarnings("serial")
+public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StateCheckpointedITCase.class);
+
+ final long NUM_STRINGS = 10_000_000L;
+
+ /**
+ * Runs the following program:
+ *
+ * <pre>
+ * [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
+ * </pre>
+ */
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
+ assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+ final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
+ final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
+
+ final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+
+ env.enableCheckpointing(200);
+
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+
+ stream
+ // first vertex, chained to the source
+ // this filter throttles the flow until at least one checkpoint
+ // is complete, to make sure this program does not run without
+ .filter(new StringRichFilterFunction())
+
+ // -------------- seconds vertex - one-to-one connected ----------------
+ .map(new StringPrefixCountRichMapFunction())
+ .startNewChain()
+ .map(new StatefulCounterFunction())
+
+ // -------------- third vertex - reducer and the sink ----------------
+ .partitionByHash("prefix")
+ .flatMap(new OnceFailingAggregator(failurePos))
+ .addSink(new ValidatingSink());
+ }
+
+ @Override
+ public void postSubmit() {
+
+ //assertTrue("Test inconclusive: failure occurred before first checkpoint",
+ // OnceFailingAggregator.wasCheckpointedBeforeFailure);
+ if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
+ LOG.warn("Test inconclusive: failure occurred before first checkpoint");
+ }
+
+ long filterSum = 0;
+ for (long l : StringRichFilterFunction.counts) {
+ filterSum += l;
+ }
+
+ long mapSum = 0;
+ for (long l : StringPrefixCountRichMapFunction.counts) {
+ mapSum += l;
+ }
+
+ long countSum = 0;
+ for (long l : StatefulCounterFunction.counts) {
+ countSum += l;
+ }
+
+ // verify that we counted exactly right
+ assertEquals(NUM_STRINGS, filterSum);
+ assertEquals(NUM_STRINGS, mapSum);
+ assertEquals(NUM_STRINGS, countSum);
+
+ for (Map<Character, Long> map : ValidatingSink.maps) {
+ for (Long count : map.values()) {
+ assertEquals(NUM_STRINGS / 40, count.longValue());
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Functions
+ // --------------------------------------------------------------------------------------------
+
+ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+ implements CheckpointedAsynchronously<Integer>
+ {
+ private final long numElements;
+
+ private int index;
+
+ private volatile boolean isRunning = true;
+
+
+ StringGeneratingSourceFunction(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ final Object lockingObject = ctx.getCheckpointLock();
+
+ final Random rnd = new Random();
+ final StringBuilder stringBuilder = new StringBuilder();
+
+ final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ if (index == 0) {
+ index = getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ while (isRunning && index < numElements) {
+ char first = (char) ((index % 40) + 40);
+
+ stringBuilder.setLength(0);
+ stringBuilder.append(first);
+
+ String result = randomString(stringBuilder, rnd);
+
+ synchronized (lockingObject) {
+ index += step;
+ ctx.collect(result);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ private static String randomString(StringBuilder bld, Random rnd) {
+ final int len = rnd.nextInt(10) + 5;
+
+ for (int i = 0; i < len; i++) {
+ char next = (char) (rnd.nextInt(20000) + 33);
+ bld.append(next);
+ }
+
+ return bld.toString();
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
+ }
+
+ private static class StringRichFilterFunction extends RichFilterFunction<String>
+ implements Checkpointed<Long> {
+
+ static final long[] counts = new long[PARALLELISM];
+
+ private long count;
+
+ @Override
+ public boolean filter(String value) throws Exception {
+ count++;
+ return value.length() < 100; // should be always true
+ }
+
+ @Override
+ public void close() {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+ }
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
+ }
+
+ private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
+ implements CheckpointedAsynchronously<Long> {
+
+ static final long[] counts = new long[PARALLELISM];
+
+ private long count;
+
+ @Override
+ public PrefixCount map(String value) {
+ count++;
+ return new PrefixCount(value.substring(0, 1), value, 1L);
+ }
+
+ @Override
+ public void close() {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+ }
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
+ }
+
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
+ implements Checkpointed<Long> {
+
+ static final long[] counts = new long[PARALLELISM];
+
+ private long count;
+
+ @Override
+ public PrefixCount map(PrefixCount value) throws Exception {
+ count++;
+ return value;
+ }
+
+ @Override
+ public void close() throws IOException {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+ }
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
+ }
+
+ private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
+ implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier {
+
+ static boolean wasCheckpointedBeforeFailure = false;
+
+ private static volatile boolean hasFailed = false;
+
+ private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
+
+ private long failurePos;
+ private long count;
+
+ private boolean wasCheckpointed;
+
+
+ OnceFailingAggregator(long failurePos) {
+ this.failurePos = failurePos;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ count = 0;
+ }
+
+ @Override
+ public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
+ count++;
+ if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+ wasCheckpointedBeforeFailure = wasCheckpointed;
+ hasFailed = true;
+ throw new Exception("Test Failure");
+ }
+
+ PrefixCount curr = aggregationMap.get(value.prefix);
+ if (curr == null) {
+ aggregationMap.put(value.prefix, value);
+ out.collect(value);
+ }
+ else {
+ curr.count += value.count;
+ out.collect(curr);
+ }
+ }
+
+ @Override
+ public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return aggregationMap;
+ }
+
+ @Override
+ public void restoreState(HashMap<String, PrefixCount> state) {
+ aggregationMap.putAll(state);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ this.wasCheckpointed = true;
+ }
+ }
+
+ private static class ValidatingSink extends RichSinkFunction<PrefixCount>
+ implements Checkpointed<HashMap<Character, Long>> {
+
+ @SuppressWarnings("unchecked")
+ private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
+
+ private HashMap<Character, Long> counts = new HashMap<Character, Long>();
+
+ @Override
+ public void invoke(PrefixCount value) {
+ Character first = value.prefix.charAt(0);
+ Long previous = counts.get(first);
+ if (previous == null) {
+ counts.put(first, value.count);
+ } else {
+ counts.put(first, Math.max(previous, value.count));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ maps[getRuntimeContext().getIndexOfThisSubtask()] = counts;
+ }
+
+ @Override
+ public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return counts;
+ }
+
+ @Override
+ public void restoreState(HashMap<Character, Long> state) {
+ counts.putAll(state);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
new file mode 100644
index 0000000..ba5ff1c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Option;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.createTempDirectory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Verify behaviour in case of JobManager process failure during job execution.
+ *
+ * <p>The test works with multiple job managers processes by spawning JVMs.
+ *
+ * <p>Initially, it starts two TaskManager (2 slots each) and two JobManager JVMs.
+ *
+ * <p>It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the existence of
+ * temporary files. It then kills the leading JobManager process. The recovery should restart the
+ * tasks on the new JobManager.
+ *
+ * <p>This follows the same structure as {@link AbstractTaskManagerProcessFailureRecoveryTest}.
+ */
+public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends TestLogger {
+
+ private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+
+ private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
+
+ private static final File FileStateBackendBasePath;
+
+ static {
+ try {
+ FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error in test setup. Could not create directory.", e);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (ZooKeeper != null) {
+ ZooKeeper.shutdown();
+ }
+
+ if (FileStateBackendBasePath != null) {
+ FileUtils.deleteDirectory(FileStateBackendBasePath);
+ }
+ }
+
+ @Before
+ public void cleanUp() throws Exception {
+ ZooKeeper.deleteAll();
+
+ FileUtils.cleanDirectory(FileStateBackendBasePath);
+ }
+
+ protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+ protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
+ protected static final String PROCEED_MARKER_FILE = "proceed";
+
+ protected static final int PARALLELISM = 4;
+
+ /**
+ * Test program with JobManager failure.
+ *
+ * @param zkQuorum ZooKeeper quorum to connect to
+ * @param coordinateDir Coordination directory
+ * @throws Exception
+ */
+ public abstract void testJobManagerFailure(String zkQuorum, File coordinateDir) throws Exception;
+
+ @Test
+ public void testJobManagerProcessFailure() throws Exception {
+ // Config
+ final int numberOfJobManagers = 2;
+ final int numberOfTaskManagers = 2;
+ final int numberOfSlotsPerTaskManager = 2;
+
+ assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
+
+ // Setup
+ // Test actor system
+ ActorSystem testActorSystem;
+
+ // Job managers
+ final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers];
+
+ // Task managers
+ final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
+
+ // Leader election service
+ LeaderRetrievalService leaderRetrievalService = null;
+
+ // Coordination between the processes goes through a directory
+ File coordinateTempDir = null;
+
+ try {
+ final Deadline deadline = TestTimeOut.fromNow();
+
+ // Coordination directory
+ coordinateTempDir = createTempDirectory();
+
+ // Job Managers
+ Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+
+ // Start first process
+ jmProcess[0] = new JobManagerProcess(0, config);
+ jmProcess[0].createAndStart();
+
+ // Task manager configuration
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+ // Start the task manager process
+ for (int i = 0; i < numberOfTaskManagers; i++) {
+ tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+ TaskManager.startTaskManagerComponentsAndActor(
+ config, tmActorSystem[i], "localhost",
+ Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
+ false, StreamingMode.STREAMING, TaskManager.class);
+ }
+
+ // Test actor system
+ testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+
+ jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft());
+
+ // Leader listener
+ TestingListener leaderListener = new TestingListener();
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
+ leaderRetrievalService.start(leaderListener);
+
+ // Initial submission
+ leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+ String leaderAddress = leaderListener.getAddress();
+ UUID leaderId = leaderListener.getLeaderSessionID();
+
+ // Get the leader ref
+ ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
+ ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId);
+
+ // Wait for all task managers to connect to the leading job manager
+ JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway,
+ deadline.timeLeft());
+
+ final File coordinateDirClosure = coordinateTempDir;
+ final Throwable[] errorRef = new Throwable[1];
+
+ // we trigger program execution in a separate thread
+ Thread programTrigger = new Thread("Program Trigger") {
+ @Override
+ public void run() {
+ try {
+ testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure);
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ errorRef[0] = t;
+ }
+ }
+ };
+
+ //start the test program
+ programTrigger.start();
+
+ // wait until all marker files are in place, indicating that all tasks have started
+ AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
+ READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
+
+ // Kill one of the job managers and trigger recovery
+ jmProcess[0].destroy();
+
+ jmProcess[1] = new JobManagerProcess(1, config);
+ jmProcess[1].createAndStart();
+
+ jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
+
+ // we create the marker file which signals the program functions tasks that they can complete
+ AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+ programTrigger.join(deadline.timeLeft().toMillis());
+
+ // We wait for the finish marker file. We don't wait for the program trigger, because
+ // we submit in detached mode.
+ AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
+ FINISH_MARKER_FILE_PREFIX, 1, deadline.timeLeft().toMillis());
+
+ // check that the program really finished
+ assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+ // check whether the program encountered an error
+ if (errorRef[0] != null) {
+ Throwable error = errorRef[0];
+ error.printStackTrace();
+ fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+
+ for (JobManagerProcess p : jmProcess) {
+ if (p != null) {
+ p.printProcessLog();
+ }
+ }
+
+ fail(e.getMessage());
+ }
+ finally {
+ for (int i = 0; i < numberOfTaskManagers; i++) {
+ if (tmActorSystem[i] != null) {
+ tmActorSystem[i].shutdown();
+ }
+ }
+
+ if (leaderRetrievalService != null) {
+ leaderRetrievalService.stop();
+ }
+
+ for (JobManagerProcess jmProces : jmProcess) {
+ if (jmProces != null) {
+ jmProces.destroy();
+ }
+ }
+
+ // Delete coordination directory
+ if (coordinateTempDir != null) {
+ try {
+ FileUtils.deleteDirectory(coordinateTempDir);
+ }
+ catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
deleted file mode 100644
index 7e16baf..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-/**
- * Abstract base for tests verifying the behavior of the recovery in the
- * case when a TaskManager fails (process is killed) in the middle of a job execution.
- *
- * The test works with multiple task managers processes by spawning JVMs.
- * Initially, it starts a JobManager in process and two TaskManagers JVMs with
- * 2 task slots each.
- * It submits a program with parallelism 4 and waits until all tasks are brought up.
- * Coordination between the test and the tasks happens via checking for the
- * existence of temporary files. It then starts another TaskManager, which is
- * guaranteed to remain empty (all tasks are already deployed) and kills one of
- * the original task managers. The recovery should restart the tasks on the new TaskManager.
- */
-public abstract class AbstractProcessFailureRecoveryTest extends TestLogger {
-
- protected static final String READY_MARKER_FILE_PREFIX = "ready_";
- protected static final String PROCEED_MARKER_FILE = "proceed";
-
- protected static final int PARALLELISM = 4;
-
- @Test
- public void testTaskManagerProcessFailure() {
-
- final StringWriter processOutput1 = new StringWriter();
- final StringWriter processOutput2 = new StringWriter();
- final StringWriter processOutput3 = new StringWriter();
-
- ActorSystem jmActorSystem = null;
- Process taskManagerProcess1 = null;
- Process taskManagerProcess2 = null;
- Process taskManagerProcess3 = null;
-
- File coordinateTempDir = null;
-
- try {
- // check that we run this test only if the java command
- // is available on this machine
- String javaCommand = getJavaCommandPath();
- if (javaCommand == null) {
- System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
- return;
- }
-
- // create a logging file for the process
- File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
- tempLogFile.deleteOnExit();
- CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
- // coordination between the processes goes through a directory
- coordinateTempDir = createTempDirectory();
-
- // find a free port to start the JobManager
- final int jobManagerPort = NetUtils.getAvailablePort();
-
- // start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
- Configuration jmConfig = new Configuration();
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
- jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
- jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
-
- jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(
- jmConfig,
- jmActorSystem,
- StreamingMode.STREAMING,
- JobManager.class,
- MemoryArchivist.class)._1();
-
- // the TaskManager java command
- String[] command = new String[] {
- javaCommand,
- "-Dlog.level=DEBUG",
- "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
- "-Xms80m", "-Xmx80m",
- "-classpath", getCurrentClasspath(),
- TaskManagerProcessEntryPoint.class.getName(),
- String.valueOf(jobManagerPort)
- };
-
- // start the first two TaskManager processes
- taskManagerProcess1 = new ProcessBuilder(command).start();
- new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
- taskManagerProcess2 = new ProcessBuilder(command).start();
- new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
-
- // we wait for the JobManager to have the two TaskManagers available
- // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
- waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
-
- // the program will set a marker file in each of its parallel tasks once they are ready, so that
- // this coordinating code is aware of this.
- // the program will very slowly consume elements until the marker file (later created by the
- // test driver code) is present
- final File coordinateDirClosure = coordinateTempDir;
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- // we trigger program execution in a separate thread
- Thread programTrigger = new Thread("Program Trigger") {
- @Override
- public void run() {
- try {
- testProgram(jobManagerPort, coordinateDirClosure);
- }
- catch (Throwable t) {
- t.printStackTrace();
- errorRef.set(t);
- }
- }
- };
-
- //start the test program
- programTrigger.start();
-
- // wait until all marker files are in place, indicating that all tasks have started
- // max 20 seconds
- if (!waitForMarkerFiles(coordinateTempDir, PARALLELISM, 120000)) {
- // check if the program failed for some reason
- if (errorRef.get() != null) {
- Throwable error = errorRef.get();
- error.printStackTrace();
- fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
- }
- else {
- // no error occurred, simply a timeout
- fail("The tasks were not started within time (" + 120000 + "msecs)");
- }
- }
-
- // start the third TaskManager
- taskManagerProcess3 = new ProcessBuilder(command).start();
- new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
-
- // we wait for the third TaskManager to register
- // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
- waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
-
- // kill one of the previous TaskManagers, triggering a failure and recovery
- taskManagerProcess1.destroy();
- taskManagerProcess1 = null;
-
- // we create the marker file which signals the program functions tasks that they can complete
- touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
-
- // wait for at most 5 minutes for the program to complete
- programTrigger.join(300000);
-
- // check that the program really finished
- assertFalse("The program did not finish in time", programTrigger.isAlive());
-
- // check whether the program encountered an error
- if (errorRef.get() != null) {
- Throwable error = errorRef.get();
- error.printStackTrace();
- fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
- }
-
- // all seems well :-)
- }
- catch (Exception e) {
- e.printStackTrace();
- printProcessLog("TaskManager 1", processOutput1.toString());
- printProcessLog("TaskManager 2", processOutput2.toString());
- printProcessLog("TaskManager 3", processOutput3.toString());
- fail(e.getMessage());
- }
- catch (Error e) {
- e.printStackTrace();
- printProcessLog("TaskManager 1", processOutput1.toString());
- printProcessLog("TaskManager 2", processOutput2.toString());
- printProcessLog("TaskManager 3", processOutput3.toString());
- throw e;
- }
- finally {
- if (taskManagerProcess1 != null) {
- taskManagerProcess1.destroy();
- }
- if (taskManagerProcess2 != null) {
- taskManagerProcess2.destroy();
- }
- if (taskManagerProcess3 != null) {
- taskManagerProcess3.destroy();
- }
- if (jmActorSystem != null) {
- jmActorSystem.shutdown();
- }
- if (coordinateTempDir != null) {
- try {
- FileUtils.deleteDirectory(coordinateTempDir);
- }
- catch (Throwable t) {
- // we can ignore this
- }
- }
- }
- }
-
- /**
- * The test program should be implemented here in a form of a separate thread.
- * This provides a solution for checking that it has been terminated.
- *
- * @param jobManagerPort The port for submitting the topology to the local cluster
- * @param coordinateDir TaskManager failure will be triggered only after processes
- * have successfully created file under this directory
- */
- public abstract void testProgram(int jobManagerPort, File coordinateDir) throws Exception;
-
-
- protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
- throws Exception
- {
- final long deadline = System.currentTimeMillis() + maxDelay;
- while (true) {
- long remaining = deadline - System.currentTimeMillis();
- if (remaining <= 0) {
- fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
- }
-
- FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
-
- try {
- Future<?> result = Patterns.ask(jobManager,
- JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- new Timeout(timeout));
- Integer numTMs = (Integer) Await.result(result, timeout);
- if (numTMs == numExpected) {
- break;
- }
- }
- catch (TimeoutException e) {
- // ignore and retry
- }
- catch (ClassCastException e) {
- fail("Wrong response: " + e.getMessage());
- }
- }
- }
-
- protected static void printProcessLog(String processName, String log) {
- if (log == null || log.length() == 0) {
- return;
- }
-
- System.out.println("-----------------------------------------");
- System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
- System.out.println("-----------------------------------------");
- System.out.println(log);
- System.out.println("-----------------------------------------");
- System.out.println(" END SPAWNED PROCESS LOG");
- System.out.println("-----------------------------------------");
- }
-
- protected static File createTempDirectory() throws IOException {
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
- for (int i = 0; i < 10; i++) {
- File dir = new File(tempDir, UUID.randomUUID().toString());
- if (!dir.exists() && dir.mkdirs()) {
- return dir;
- }
- System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
- }
-
- throw new IOException("Could not create temporary file directory");
- }
-
- protected static void touchFile(File file) throws IOException {
- if (!file.exists()) {
- new FileOutputStream(file).close();
- }
- if (!file.setLastModified(System.currentTimeMillis())) {
- throw new IOException("Could not touch the file.");
- }
- }
-
- protected static boolean waitForMarkerFiles(File basedir, int num, long timeout) {
- long now = System.currentTimeMillis();
- final long deadline = now + timeout;
-
-
- while (now < deadline) {
- boolean allFound = true;
-
- for (int i = 0; i < num; i++) {
- File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
- if (!nextToCheck.exists()) {
- allFound = false;
- break;
- }
- }
-
- if (allFound) {
- return true;
- }
- else {
- // not all found, wait for a bit
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- now = System.currentTimeMillis();
- }
- }
-
- return false;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
- */
- public static class TaskManagerProcessEntryPoint {
-
- private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
- public static void main(String[] args) {
- try {
- int jobManagerPort = Integer.parseInt(args[0]);
-
- Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-
- TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class);
-
- // wait forever
- Object lock = new Object();
- synchronized (lock) {
- lock.wait();
- }
- }
- catch (Throwable t) {
- LOG.error("Failed to start TaskManager process", t);
- System.exit(1);
- }
- }
- }
-
- /**
- * Utility class to read the output of a process stream and forward it into a StringWriter.
- */
- protected static class PipeForwarder extends Thread {
-
- private final StringWriter target;
- private final InputStream source;
-
- public PipeForwarder(InputStream source, StringWriter target) {
- super("Pipe Forwarder");
- setDaemon(true);
-
- this.source = source;
- this.target = target;
-
- start();
- }
-
- @Override
- public void run() {
- try {
- int next;
- while ((next = source.read()) != -1) {
- target.write(next);
- }
- }
- catch (IOException e) {
- // terminate
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
new file mode 100644
index 0000000..c02fa6c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Abstract base for tests verifying the behavior of the recovery in the
+ * case when a TaskManager fails (process is killed) in the middle of a job execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends TestLogger {
+
+ protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+ protected static final String PROCEED_MARKER_FILE = "proceed";
+ protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
+
+ protected static final int PARALLELISM = 4;
+
+ @Test
+ public void testTaskManagerProcessFailure() {
+
+ final StringWriter processOutput1 = new StringWriter();
+ final StringWriter processOutput2 = new StringWriter();
+ final StringWriter processOutput3 = new StringWriter();
+
+ ActorSystem jmActorSystem = null;
+ Process taskManagerProcess1 = null;
+ Process taskManagerProcess2 = null;
+ Process taskManagerProcess3 = null;
+
+ File coordinateTempDir = null;
+
+ try {
+ // check that we run this test only if the java command
+ // is available on this machine
+ String javaCommand = getJavaCommandPath();
+ if (javaCommand == null) {
+ System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
+ return;
+ }
+
+ // create a logging file for the process
+ File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+ tempLogFile.deleteOnExit();
+ CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+ // coordination between the processes goes through a directory
+ coordinateTempDir = CommonTestUtils.createTempDirectory();
+
+ // find a free port to start the JobManager
+ final int jobManagerPort = NetUtils.getAvailablePort();
+
+ // start a JobManager
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+
+ Configuration jmConfig = new Configuration();
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+ jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
+ jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+ jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
+
+ jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
+ ActorRef jmActor = JobManager.startJobManagerActors(
+ jmConfig,
+ jmActorSystem,
+ StreamingMode.STREAMING,
+ JobManager.class,
+ MemoryArchivist.class)._1();
+
+ // the TaskManager java command
+ String[] command = new String[] {
+ javaCommand,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
+ "-Xms80m", "-Xmx80m",
+ "-classpath", getCurrentClasspath(),
+ TaskManagerProcessEntryPoint.class.getName(),
+ String.valueOf(jobManagerPort)
+ };
+
+ // start the first two TaskManager processes
+ taskManagerProcess1 = new ProcessBuilder(command).start();
+ new CommonTestUtils.PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
+ taskManagerProcess2 = new ProcessBuilder(command).start();
+ new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
+
+ // we wait for the JobManager to have the two TaskManagers available
+ // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
+ waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
+
+ // the program will set a marker file in each of its parallel tasks once they are ready, so that
+ // this coordinating code is aware of this.
+ // the program will very slowly consume elements until the marker file (later created by the
+ // test driver code) is present
+ final File coordinateDirClosure = coordinateTempDir;
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ // we trigger program execution in a separate thread
+ Thread programTrigger = new Thread("Program Trigger") {
+ @Override
+ public void run() {
+ try {
+ testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ errorRef.set(t);
+ }
+ }
+ };
+
+ //start the test program
+ programTrigger.start();
+
+ // wait until all marker files are in place, indicating that all tasks have started
+ // max 20 seconds
+ if (!waitForMarkerFiles(coordinateTempDir, READY_MARKER_FILE_PREFIX, PARALLELISM, 120000)) {
+ // check if the program failed for some reason
+ if (errorRef.get() != null) {
+ Throwable error = errorRef.get();
+ error.printStackTrace();
+ fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+ }
+ else {
+ // no error occurred, simply a timeout
+ fail("The tasks were not started within time (" + 120000 + "msecs)");
+ }
+ }
+
+ // start the third TaskManager
+ taskManagerProcess3 = new ProcessBuilder(command).start();
+ new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
+
+ // we wait for the third TaskManager to register
+ // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
+ waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
+
+ // kill one of the previous TaskManagers, triggering a failure and recovery
+ taskManagerProcess1.destroy();
+ taskManagerProcess1 = null;
+
+ // we create the marker file which signals the program functions tasks that they can complete
+ touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+ // wait for at most 5 minutes for the program to complete
+ programTrigger.join(300000);
+
+ // check that the program really finished
+ assertFalse("The program did not finish in time", programTrigger.isAlive());
+
+ // check whether the program encountered an error
+ if (errorRef.get() != null) {
+ Throwable error = errorRef.get();
+ error.printStackTrace();
+ fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
+ }
+
+ // all seems well :-)
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager 1", processOutput1.toString());
+ printProcessLog("TaskManager 2", processOutput2.toString());
+ printProcessLog("TaskManager 3", processOutput3.toString());
+ fail(e.getMessage());
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ printProcessLog("TaskManager 1", processOutput1.toString());
+ printProcessLog("TaskManager 2", processOutput2.toString());
+ printProcessLog("TaskManager 3", processOutput3.toString());
+ throw e;
+ }
+ finally {
+ if (taskManagerProcess1 != null) {
+ taskManagerProcess1.destroy();
+ }
+ if (taskManagerProcess2 != null) {
+ taskManagerProcess2.destroy();
+ }
+ if (taskManagerProcess3 != null) {
+ taskManagerProcess3.destroy();
+ }
+ if (jmActorSystem != null) {
+ jmActorSystem.shutdown();
+ }
+ if (coordinateTempDir != null) {
+ try {
+ FileUtils.deleteDirectory(coordinateTempDir);
+ }
+ catch (Throwable t) {
+ // we can ignore this
+ }
+ }
+ }
+ }
+
+ /**
+ * The test program should be implemented here in a form of a separate thread.
+ * This provides a solution for checking that it has been terminated.
+ *
+ * @param jobManagerPort The port for submitting the topology to the local cluster
+ * @param coordinateDir TaskManager failure will be triggered only after processes
+ * have successfully created file under this directory
+ */
+ public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
+
+
+ protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+ throws Exception
+ {
+ final long deadline = System.currentTimeMillis() + maxDelay;
+ while (true) {
+ long remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0) {
+ fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
+ }
+
+ FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+
+ try {
+ Future<?> result = Patterns.ask(jobManager,
+ JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+ new Timeout(timeout));
+ Integer numTMs = (Integer) Await.result(result, timeout);
+ if (numTMs == numExpected) {
+ break;
+ }
+ }
+ catch (TimeoutException e) {
+ // ignore and retry
+ }
+ catch (ClassCastException e) {
+ fail("Wrong response: " + e.getMessage());
+ }
+ }
+ }
+
+ protected static void printProcessLog(String processName, String log) {
+ if (log == null || log.length() == 0) {
+ return;
+ }
+
+ System.out.println("-----------------------------------------");
+ System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
+ System.out.println("-----------------------------------------");
+ System.out.println(log);
+ System.out.println("-----------------------------------------");
+ System.out.println(" END SPAWNED PROCESS LOG");
+ System.out.println("-----------------------------------------");
+ }
+
+ protected static void touchFile(File file) throws IOException {
+ if (!file.exists()) {
+ new FileOutputStream(file).close();
+ }
+ if (!file.setLastModified(System.currentTimeMillis())) {
+ throw new IOException("Could not touch the file.");
+ }
+ }
+
+ protected static boolean waitForMarkerFiles(File basedir, String prefix, int num, long timeout) {
+ long now = System.currentTimeMillis();
+ final long deadline = now + timeout;
+
+
+ while (now < deadline) {
+ boolean allFound = true;
+
+ for (int i = 0; i < num; i++) {
+ File nextToCheck = new File(basedir, prefix + i);
+ if (!nextToCheck.exists()) {
+ allFound = false;
+ break;
+ }
+ }
+
+ if (allFound) {
+ return true;
+ }
+ else {
+ // not all found, wait for a bit
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ now = System.currentTimeMillis();
+ }
+ }
+
+ return false;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+ */
+ public static class TaskManagerProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+ public static void main(String[] args) {
+ try {
+ int jobManagerPort = Integer.parseInt(args[0]);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+ TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class);
+
+ // wait forever
+ Object lock = new Object();
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to start TaskManager process", t);
+ System.exit(1);
+ }
+ }
+ }
+
+}