You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:49:43 UTC

[04/11] flink git commit: [FLINK-4410] [runtime] Rework checkpoint stats tracking

http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 07e1644..c5b6697 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -19,11 +19,6 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorSystem;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -51,8 +46,14 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -227,7 +228,7 @@ public class JobSubmitTest {
 
 		JobGraph jg = new JobGraph("test job", jobVertex);
 		jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
-			5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none()));
+			5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), true));
 		return jg;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 3521630..9aa35e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -73,8 +73,7 @@ public class CheckpointMessagesTest {
 							CheckpointCoordinatorTest.generateChainedPartitionableStateHandle(new JobVertexID(), 0, 2, 8, false),
 							null,
 							CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, Collections.singletonList(new MyHandle())),
-							null,
-							0L);
+							null);
 
 			AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
 					new JobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 4aa0565..60b12d2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -817,7 +817,12 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
+            60000,
+            60000,
+            60000,
+            1,
+            ExternalizedCheckpointSettings.none,
+            true))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -871,7 +876,12 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
+            60000,
+            60000,
+            60000,
+            1,
+            ExternalizedCheckpointSettings.none,
+            true))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -933,7 +943,12 @@ class JobManagerITCase(_system: ActorSystem)
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
-            60000, 60000, 60000, 1, ExternalizedCheckpointSettings.none))
+            60000,
+            60000,
+            60000,
+            1,
+            ExternalizedCheckpointSettings.none,
+            true))
 
           // Submit job...
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)

http://git-wip-us.apache.org/repos/asf/flink/blob/579bc964/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index da69b49..0cb7d9a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -524,11 +524,26 @@ public class StreamingJobGraphGenerator {
 			externalizedCheckpointSettings = ExternalizedCheckpointSettings.none();
 		}
 
+		CheckpointingMode mode = cfg.getCheckpointingMode();
+
+		boolean isExactlyOnce;
+		if (mode == CheckpointingMode.EXACTLY_ONCE) {
+			isExactlyOnce = true;
+		} else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
+			isExactlyOnce = false;
+		} else {
+			throw new IllegalStateException("Unexpected checkpointing mode. " +
+				"Did not expect there to be another checkpointing mode besides " +
+				"exactly-once or at-least-once.");
+		}
+
 		JobSnapshottingSettings settings = new JobSnapshottingSettings(
 				triggerVertices, ackVertices, commitVertices, interval,
 				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
-				externalizedCheckpointSettings);
+				externalizedCheckpointSettings,
+				isExactlyOnce);
+
 		jobGraph.setSnapshotSettings(settings);
 	}
 }