You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/12 16:50:26 UTC
[1/2] flink git commit: [FLINK-5407] IT case for savepoint with
iterative job
Repository: flink
Updated Branches:
refs/heads/master fc343e0c3 -> 82ed79999
[FLINK-5407] IT case for savepoint with iterative job
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82ed7999
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82ed7999
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82ed7999
Branch: refs/heads/master
Commit: 82ed799999e3f05ebfd67d69dfb56ff13dbd497a
Parents: 9c6eb57
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jan 10 16:08:06 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jan 12 17:40:32 2017 +0100
----------------------------------------------------------------------
.../runtime/testingUtils/TestingCluster.scala | 74 ++++++-
.../test/checkpointing/SavepointITCase.java | 198 +++++++++++++++++++
2 files changed, 269 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/82ed7999/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 269a66f..d6215eb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,26 +18,31 @@
package org.apache.flink.runtime.testingUtils
-import java.util.concurrent.{Executor, ExecutorService, TimeUnit, TimeoutException}
+import java.io.IOException
+import java.util.concurrent.{Executor, TimeUnit, TimeoutException}
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.Patterns._
import akka.pattern.ask
import akka.testkit.CallingThreadDispatcher
+import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.instance.{ActorGateway, InstanceManager}
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.testutils.TestingResourceManager
@@ -281,7 +286,70 @@ class TestingCluster(
}
}
}
-}
+
+ @throws(classOf[IOException])
+ def triggerSavepoint(jobId: JobID): String = {
+ val timeout = AkkaUtils.getTimeout(configuration)
+ triggerSavepoint(jobId, getLeaderGateway(timeout), timeout)
+ }
+
+ @throws(classOf[IOException])
+ def requestSavepoint(savepointPath: String): Savepoint = {
+ val timeout = AkkaUtils.getTimeout(configuration)
+ requestSavepoint(savepointPath, getLeaderGateway(timeout), timeout)
+ }
+
+ @throws(classOf[IOException])
+ def disposeSavepoint(savepointPath: String): Unit = {
+ val timeout = AkkaUtils.getTimeout(configuration)
+ disposeSavepoint(savepointPath, getLeaderGateway(timeout), timeout)
+ }
+
+ @throws(classOf[IOException])
+ def triggerSavepoint(
+ jobId: JobID,
+ jobManager: ActorGateway,
+ timeout: FiniteDuration): String = {
+ val result = Await.result(
+ jobManager.ask(
+ TriggerSavepoint(jobId), timeout), timeout)
+
+ result match {
+ case success: TriggerSavepointSuccess => success.savepointPath
+ case fail: TriggerSavepointFailure => throw new IOException(fail.cause)
+ case _ => throw new IllegalStateException("Trigger savepoint failed")
+ }
+ }
+
+ @throws(classOf[IOException])
+ def requestSavepoint(
+ savepointPath: String,
+ jobManager: ActorGateway,
+ timeout: FiniteDuration): Savepoint = {
+ val result = Await.result(
+ jobManager.ask(
+ TestingJobManagerMessages.RequestSavepoint(savepointPath), timeout), timeout)
+
+ result match {
+ case success: ResponseSavepoint => success.savepoint
+ case _ => throw new IOException("Request savepoint failed")
+ }
+ }
+
+ @throws(classOf[IOException])
+ def disposeSavepoint(
+ savepointPath: String,
+ jobManager: ActorGateway,
+ timeout: FiniteDuration): Unit = {
+ val timeout = AkkaUtils.getTimeout(originalConfiguration)
+ val jobManager = getLeaderGateway(timeout)
+ val result = Await.result(jobManager.ask(DisposeSavepoint(savepointPath), timeout), timeout)
+ result match {
+ case DisposeSavepointSuccess =>
+ case _ => throw new IOException("Dispose savepoint failed")
+ }
+ }
+ }
object TestingCluster {
val MAX_RESTART_DURATION = new FiniteDuration(2, TimeUnit.MINUTES)
http://git-wip-us.apache.org/repos/asf/flink/blob/82ed7999/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index d52f115..9f957e5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -25,10 +25,16 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
@@ -58,11 +64,17 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -78,6 +90,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@@ -613,4 +626,189 @@ public class SavepointITCase extends TestLogger {
}
}
+ private static final int ITER_TEST_PARALLELISM = 1;
+ private static OneShotLatch[] ITER_TEST_SNAPSHOT_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
+ private static OneShotLatch[] ITER_TEST_RESTORE_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
+ private static int[] ITER_TEST_CHECKPOINT_VERIFY = new int[ITER_TEST_PARALLELISM];
+
+ @Test
+ public void testSavepointForJobWithIteration() throws Exception {
+
+ for (int i = 0; i < ITER_TEST_PARALLELISM; ++i) {
+ ITER_TEST_SNAPSHOT_WAIT[i] = new OneShotLatch();
+ ITER_TEST_RESTORE_WAIT[i] = new OneShotLatch();
+ ITER_TEST_CHECKPOINT_VERIFY[i] = 0;
+ }
+
+ TemporaryFolder folder = new TemporaryFolder();
+ folder.create();
+ // Temporary directory for file state backend
+ final File tmpDir = folder.newFolder();
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ final IntegerStreamSource source = new IntegerStreamSource();
+ IterativeStream<Integer> iteration = env.addSource(source)
+ .flatMap(new RichFlatMapFunction<Integer, Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Integer in, Collector<Integer> clctr) throws Exception {
+ clctr.collect(in);
+ }
+ }).setParallelism(ITER_TEST_PARALLELISM)
+ .keyBy(new KeySelector<Integer, Object>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object getKey(Integer value) throws Exception {
+ return value;
+ }
+ })
+ .flatMap(new DuplicateFilter())
+ .setParallelism(ITER_TEST_PARALLELISM)
+ .iterate();
+
+ DataStream<Integer> iterationBody = iteration
+ .map(new MapFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+ })
+ .setParallelism(ITER_TEST_PARALLELISM);
+
+ iteration.closeWith(iterationBody);
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setJobName("Test");
+
+ JobGraph jobGraph = streamGraph.getJobGraph();
+
+ Configuration config = new Configuration();
+ config.addAll(jobGraph.getJobConfiguration());
+ config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism());
+ final File checkpointDir = new File(tmpDir, "checkpoints");
+ final File savepointDir = new File(tmpDir, "savepoints");
+
+ if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+ fail("Test setup failed: failed to create temporary directories.");
+ }
+
+ config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+ config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
+ checkpointDir.toURI().toString());
+ config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
+ config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+ savepointDir.toURI().toString());
+
+ TestingCluster cluster = new TestingCluster(config, false);
+ String savepointPath = null;
+ try {
+ cluster.start();
+
+ cluster.submitJobDetached(jobGraph);
+ for (OneShotLatch latch : ITER_TEST_SNAPSHOT_WAIT) {
+ latch.await();
+ }
+ savepointPath = cluster.triggerSavepoint(jobGraph.getJobID());
+ source.cancel();
+
+ jobGraph = streamGraph.getJobGraph();
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+
+ cluster.submitJobDetached(jobGraph);
+ for (OneShotLatch latch : ITER_TEST_RESTORE_WAIT) {
+ latch.await();
+ }
+ source.cancel();
+ } finally {
+ if (null != savepointPath) {
+ cluster.disposeSavepoint(savepointPath);
+ }
+ cluster.stop();
+ cluster.awaitTermination();
+ }
+ }
+
+ private static final class IntegerStreamSource
+ extends RichSourceFunction<Integer>
+ implements ListCheckpointed<Integer> {
+
+ private static final long serialVersionUID = 1L;
+ private volatile boolean running;
+ private volatile boolean isRestored;
+ private int emittedCount;
+
+ public IntegerStreamSource() {
+ this.running = true;
+ this.isRestored = false;
+ this.emittedCount = 0;
+ }
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(emittedCount);
+ }
+
+ if (emittedCount < 100) {
+ ++emittedCount;
+ } else {
+ emittedCount = 0;
+ }
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()] = emittedCount;
+ return Collections.singletonList(emittedCount);
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ if (!state.isEmpty()) {
+ this.emittedCount = state.get(0);
+ }
+ Assert.assertEquals(ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()], emittedCount);
+ ITER_TEST_RESTORE_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
+ }
+ }
+
+ public static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
+
+ static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
+ private static final long serialVersionUID = 1L;
+ private ValueState<Boolean> operatorState;
+
+ @Override
+ public void open(Configuration configuration) {
+ operatorState = this.getRuntimeContext().getState(descriptor);
+ }
+
+ @Override
+ public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+ if (!operatorState.value()) {
+ out.collect(value);
+ operatorState.update(true);
+ }
+
+ if (30 == value) {
+ ITER_TEST_SNAPSHOT_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
+ }
+ }
+ }
}
[2/2] flink git commit: [FLINK-5407] Handle snapshoting null-operator
in chain
Posted by al...@apache.org.
[FLINK-5407] Handle snapshoting null-operator in chain
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c6eb579
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c6eb579
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c6eb579
Branch: refs/heads/master
Commit: 9c6eb5793258de15a83f4cf7b13180d370062531
Parents: fc343e0
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 5 14:28:50 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jan 12 17:40:32 2017 +0100
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 45 ++++++++++++++------
1 file changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9c6eb579/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1c20393..265cb5c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -902,8 +902,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size());
for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) {
- operatorStatesBackend.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
- operatorStatesStream.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+ if (null != snapshotInProgress) {
+ operatorStatesBackend.add(
+ FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
+ operatorStatesStream.add(
+ FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+ } else {
+ operatorStatesBackend.add(null);
+ operatorStatesStream.add(null);
+ }
}
final long asyncEndNanos = System.nanoTime();
@@ -950,7 +957,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
public void close() {
// cleanup/release ongoing snapshot operations
for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
- snapshotResult.cancel();
+ if (null != snapshotResult) {
+ snapshotResult.cancel();
+ }
}
}
}
@@ -995,14 +1004,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
try {
for (StreamOperator<?> op : allOperators) {
-
- createStreamFactory(op);
- snapshotNonPartitionableState(op);
-
- OperatorSnapshotResult snapshotInProgress =
- op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);
-
- snapshotInProgressList.add(snapshotInProgress);
+ checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
@@ -1029,7 +1031,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (failed) {
// Cleanup to release resources
for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
- operatorSnapshotResult.cancel();
+ if (null != operatorSnapshotResult) {
+ operatorSnapshotResult.cancel();
+ }
}
if (LOG.isDebugEnabled()) {
@@ -1039,7 +1043,24 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
}
+ }
+
+ private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
+ if (null != op) {
+ createStreamFactory(op);
+ snapshotNonPartitionableState(op);
+
+ OperatorSnapshotResult snapshotInProgress = op.snapshotState(
+ checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getTimestamp(),
+ streamFactory);
+ snapshotInProgressList.add(snapshotInProgress);
+ } else {
+ nonPartitionedStates.add(null);
+ OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
+ snapshotInProgressList.add(emptySnapshotInProgress);
+ }
}
private void createStreamFactory(StreamOperator<?> operator) throws IOException {