You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:49:43 UTC
[04/11] flink git commit: [FLINK-4410] [runtime] Rework checkpoint
stats tracking
http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 07e1644..c5b6697 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -19,11 +19,6 @@
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorSystem;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -51,8 +46,14 @@ import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -227,7 +228,7 @@ public class JobSubmitTest {
JobGraph jg = new JobGraph("test job", jobVertex);
jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
- 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none()));
+ 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), true));
return jg;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 3521630..9aa35e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -73,8 +73,7 @@ public class CheckpointMessagesTest {
CheckpointCoordinatorTest.generateChainedPartitionableStateHandle(new JobVertexID(), 0, 2, 8, false),
null,
CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, Collections.singletonList(new MyHandle())),
- null,
- 0L);
+ null);
AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
new JobID(),
http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 4aa0565..60b12d2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -817,7 +817,12 @@ class JobManagerITCase(_system: ActorSystem)
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
- 60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
+ 60000,
+ 60000,
+ 60000,
+ 1,
+ ExternalizedCheckpointSettings.none,
+ true))
// Submit job...
jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -871,7 +876,12 @@ class JobManagerITCase(_system: ActorSystem)
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
- 60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
+ 60000,
+ 60000,
+ 60000,
+ 1,
+ ExternalizedCheckpointSettings.none,
+ true))
// Submit job...
jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -933,7 +943,12 @@ class JobManagerITCase(_system: ActorSystem)
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
- 60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
+ 60000,
+ 60000,
+ 60000,
+ 1,
+ ExternalizedCheckpointSettings.none,
+ true))
// Submit job...
jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index da69b49..0cb7d9a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -524,11 +524,26 @@ public class StreamingJobGraphGenerator {
externalizedCheckpointSettings = ExternalizedCheckpointSettings.none();
}
+ CheckpointingMode mode = cfg.getCheckpointingMode();
+
+ boolean isExactlyOnce;
+ if (mode == CheckpointingMode.EXACTLY_ONCE) {
+ isExactlyOnce = true;
+ } else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
+ isExactlyOnce = false;
+ } else {
+ throw new IllegalStateException("Unexpected checkpointing mode. " +
+ "Did not expect there to be another checkpointing mode besides " +
+ "exactly-once or at-least-once.");
+ }
+
JobSnapshottingSettings settings = new JobSnapshottingSettings(
triggerVertices, ackVertices, commitVertices, interval,
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
- externalizedCheckpointSettings);
+ externalizedCheckpointSettings,
+ isExactlyOnce);
+
jobGraph.setSnapshotSettings(settings);
}
}