You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/09 08:26:43 UTC

[1/4] flink git commit: [FLINK-4329] [distributed runtime] Clean up ScheduleMode

Repository: flink
Updated Branches:
  refs/heads/master f1e9daece -> 9a84b04f0


[FLINK-4329] [distributed runtime] Clean up ScheduleMode

Remove unsopported 'backtracking' option.
Pick better names for 'all' and 'from sources' options.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd98e85d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd98e85d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd98e85d

Branch: refs/heads/master
Commit: cd98e85ddd3c35e5900713266fc38916b53f172d
Parents: 862e7f0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 5 17:46:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:27:10 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java    |  9 +++------
 .../apache/flink/runtime/jobgraph/JobGraph.java   |  2 +-
 .../apache/flink/runtime/jobgraph/JobStatus.java  |  2 +-
 .../flink/runtime/jobgraph/ScheduleMode.java      | 18 +++++++-----------
 .../runtime/jobmanager/JobManagerITCase.scala     |  2 +-
 .../api/graph/StreamingJobGraphGenerator.java     |  2 +-
 6 files changed, 14 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b778fa6..1a0301d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -183,7 +183,7 @@ public class ExecutionGraph {
 	/** The mode of scheduling. Decides how to select the initial set of tasks to be deployed.
 	 * May indicate to deploy all sources, or to deploy everything, or to deploy via backtracking
 	 * from results than need to be materialized. */
-	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
+	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
 	/** Flag to indicate whether the Graph has been archived */
 	private boolean isArchived = false;
@@ -717,7 +717,7 @@ public class ExecutionGraph {
 
 			switch (scheduleMode) {
 
-				case FROM_SOURCES:
+				case LAZY_FROM_SOURCES:
 					// simply take the vertices without inputs.
 					for (ExecutionJobVertex ejv : this.tasks.values()) {
 						if (ejv.getJobVertex().isInputVertex()) {
@@ -726,15 +726,12 @@ public class ExecutionGraph {
 					}
 					break;
 
-				case ALL:
+				case EAGER:
 					for (ExecutionJobVertex ejv : getVerticesTopologically()) {
 						ejv.scheduleAll(scheduler, allowQueuedScheduling);
 					}
 					break;
 
-				case BACKTRACKING:
-					// go back from vertices that need computation to the ones we need to run
-					throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
 				default:
 					throw new JobException("Schedule mode is invalid.");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f825d5b..942f1e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -91,7 +91,7 @@ public class JobGraph implements Serializable {
 	private boolean allowQueuedScheduling;
 
 	/** The mode in which the job is scheduled */
-	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
+	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
 	/** The settings for asynchronous snapshots */
 	private JobSnapshottingSettings snapshotSettings;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 4ae566d..236a217 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -55,7 +55,7 @@ public enum JobStatus {
 	
 	// --------------------------------------------------------------------------------------------
 
-	enum TerminalState {
+	private enum TerminalState {
 		NON_TERMINAL,
 		LOCALLY,
 		GLOBALLY

http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
index 330519d..9405067 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+/**
+ * The ScheduleMode decides how tasks of an execution graph are started.  
+ */
 public enum ScheduleMode {
 
-	/**
-	 * Schedule tasks from sources to sinks with lazy deployment of receiving tasks.
-	 */
-	FROM_SOURCES,
-
-	BACKTRACKING,
-
-	/**
-	 * Schedule tasks all at once instead of lazy deployment of receiving tasks.
-	 */
-	ALL
+	/** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */
+	LAZY_FROM_SOURCES,
 
+	/** Schedules all tasks immediately. */
+	EAGER
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0c5534a..a576a58 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -343,7 +343,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver)
 
-      jobGraph.setScheduleMode(ScheduleMode.ALL)
+      jobGraph.setScheduleMode(ScheduleMode.EAGER)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
       val jmGateway = cluster.getLeaderGateway(1 seconds)

http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 3abecc1..28d982c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -113,7 +113,7 @@ public class StreamingJobGraphGenerator {
 		jobGraph = new JobGraph(streamGraph.getJobName());
 
 		// make sure that all vertices start immediately
-		jobGraph.setScheduleMode(ScheduleMode.ALL);
+		jobGraph.setScheduleMode(ScheduleMode.EAGER);
 
 		init();
 


[3/4] flink git commit: [FLINK-4332] [checkpoints] Fix SavepointV1Serializer read() / readFully()

Posted by se...@apache.org.
[FLINK-4332] [checkpoints] Fix SavepointV1Serializer read() / readFully()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f3bab10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f3bab10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f3bab10

Branch: refs/heads/master
Commit: 3f3bab10b9ca68eb31a7ef5a31e49145b51006fd
Parents: cd98e85
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 8 19:16:23 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:28:31 2016 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/savepoint/SavepointV1Serializer.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f3bab10/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 8de29a6..fcdc2ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -138,7 +138,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV0> {
 					serializedValue = new SerializedValue<>(null);
 				} else {
 					byte[] serializedData = new byte[length];
-					dis.read(serializedData, 0, length);
+					dis.readFully(serializedData, 0, length);
 					serializedValue = SerializedValue.fromBytes(serializedData);
 				}
 
@@ -165,7 +165,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV0> {
 					serializedValue = new SerializedValue<>(null);
 				} else {
 					byte[] serializedData = new byte[length];
-					dis.read(serializedData, 0, length);
+					dis.readFully(serializedData, 0, length);
 					serializedValue = SerializedValue.fromBytes(serializedData);
 				}
 


[4/4] flink git commit: [FLINK-4333] [checkpoints] Clean up naming mixups for SavepointV0 / SavepointV1 / SavepointV01

Posted by se...@apache.org.
[FLINK-4333] [checkpoints] Clean up naming mixups for SavepointV0 / SavepointV1 / SavepointV01


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a84b04f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a84b04f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a84b04f

Branch: refs/heads/master
Commit: 9a84b04f076f9cdc2fd0037fcc89f31edc596bdd
Parents: 3f3bab1
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 8 19:22:11 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:28:48 2016 +0200

----------------------------------------------------------------------
 .../savepoint/SavepointSerializers.java         |   2 +-
 .../savepoint/SavepointV0Serializer.java        | 186 +++++++++++++++++++
 .../savepoint/SavepointV1Serializer.java        | 186 -------------------
 .../savepoint/FsSavepointStoreTest.java         |   4 +-
 .../savepoint/SavepointV01SerializerTest.java   |  52 ------
 .../checkpoint/savepoint/SavepointV01Test.java  |  81 --------
 .../savepoint/SavepointV0SerializerTest.java    |  52 ++++++
 .../checkpoint/savepoint/SavepointV0Test.java   |  81 ++++++++
 8 files changed, 322 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index cc5c208..d06f3d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -31,7 +31,7 @@ public class SavepointSerializers {
 	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(1);
 
 	static {
-		SERIALIZERS.put(SavepointV0.VERSION, SavepointV1Serializer.INSTANCE);
+		SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
new file mode 100644
index 0000000..e82b85f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.KeyGroupState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Serializer for {@link SavepointV0} instances.
+ *
+ * <p>In contrast to previous savepoint versions, this serializer makes sure
+ * that no default Java serialization is used for serialization. Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
+
+	public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
+
+	private SavepointV0Serializer() {
+	}
+
+	@Override
+	public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
+		dos.writeLong(savepoint.getCheckpointId());
+
+		Collection<TaskState> taskStates = savepoint.getTaskStates();
+		dos.writeInt(taskStates.size());
+
+		for (TaskState taskState : savepoint.getTaskStates()) {
+			// Vertex ID
+			dos.writeLong(taskState.getJobVertexID().getLowerPart());
+			dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+			// Parallelism
+			int parallelism = taskState.getParallelism();
+			dos.writeInt(parallelism);
+
+			// Sub task states
+			dos.writeInt(taskState.getNumberCollectedStates());
+
+			for (int i = 0; i < parallelism; i++) {
+				SubtaskState subtaskState = taskState.getState(i);
+
+				if (subtaskState != null) {
+					dos.writeInt(i);
+
+					SerializedValue<?> serializedValue = subtaskState.getState();
+					if (serializedValue == null) {
+						dos.writeInt(-1); // null
+					} else {
+						byte[] serialized = serializedValue.getByteArray();
+						dos.writeInt(serialized.length);
+						dos.write(serialized, 0, serialized.length);
+					}
+
+					dos.writeLong(subtaskState.getStateSize());
+					dos.writeLong(subtaskState.getDuration());
+				}
+			}
+
+			// Key group states
+			dos.writeInt(taskState.getNumberCollectedKvStates());
+
+			for (int i = 0; i < parallelism; i++) {
+				KeyGroupState keyGroupState = taskState.getKvState(i);
+
+				if (keyGroupState != null) {
+					dos.write(i);
+
+					SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
+					if (serializedValue == null) {
+						dos.writeInt(-1); // null
+					} else {
+						byte[] serialized = serializedValue.getByteArray();
+						dos.writeInt(serialized.length);
+						dos.write(serialized, 0, serialized.length);
+					}
+
+					dos.writeLong(keyGroupState.getStateSize());
+					dos.writeLong(keyGroupState.getDuration());
+				}
+			}
+		}
+	}
+
+	@Override
+	public SavepointV0 deserialize(DataInputStream dis) throws IOException {
+		long checkpointId = dis.readLong();
+
+		// Task states
+		int numTaskStates = dis.readInt();
+		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+			int parallelism = dis.readInt();
+
+			// Add task state
+			TaskState taskState = new TaskState(jobVertexId, parallelism);
+			taskStates.add(taskState);
+
+			// Sub task states
+			int numSubTaskStates = dis.readInt();
+			for (int j = 0; j < numSubTaskStates; j++) {
+				int subtaskIndex = dis.readInt();
+
+				int length = dis.readInt();
+
+				SerializedValue<StateHandle<?>> serializedValue;
+				if (length == -1) {
+					serializedValue = new SerializedValue<>(null);
+				} else {
+					byte[] serializedData = new byte[length];
+					dis.readFully(serializedData, 0, length);
+					serializedValue = SerializedValue.fromBytes(serializedData);
+				}
+
+				long stateSize = dis.readLong();
+				long duration = dis.readLong();
+
+				SubtaskState subtaskState = new SubtaskState(
+						serializedValue,
+						stateSize,
+						duration);
+
+				taskState.putState(subtaskIndex, subtaskState);
+			}
+
+			// Key group states
+			int numKvStates = dis.readInt();
+			for (int j = 0; j < numKvStates; j++) {
+				int keyGroupIndex = dis.readInt();
+
+				int length = dis.readInt();
+
+				SerializedValue<StateHandle<?>> serializedValue;
+				if (length == -1) {
+					serializedValue = new SerializedValue<>(null);
+				} else {
+					byte[] serializedData = new byte[length];
+					dis.readFully(serializedData, 0, length);
+					serializedValue = SerializedValue.fromBytes(serializedData);
+				}
+
+				long stateSize = dis.readLong();
+				long duration = dis.readLong();
+
+				KeyGroupState keyGroupState = new KeyGroupState(
+						serializedValue,
+						stateSize,
+						duration);
+
+				taskState.putKvState(keyGroupIndex, keyGroupState);
+			}
+		}
+
+		return new SavepointV0(checkpointId, taskStates);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
deleted file mode 100644
index fcdc2ca..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.KeyGroupState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Serializer for {@link SavepointV0} instances.
- *
- * <p>In contrast to previous savepoint versions, this serializer makes sure
- * that no default Java serialization is used for serialization. Therefore, we
- * don't rely on any involved Java classes to stay the same.
- */
-class SavepointV1Serializer implements SavepointSerializer<SavepointV0> {
-
-	public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
-
-	private SavepointV1Serializer() {
-	}
-
-	@Override
-	public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
-		dos.writeLong(savepoint.getCheckpointId());
-
-		Collection<TaskState> taskStates = savepoint.getTaskStates();
-		dos.writeInt(taskStates.size());
-
-		for (TaskState taskState : savepoint.getTaskStates()) {
-			// Vertex ID
-			dos.writeLong(taskState.getJobVertexID().getLowerPart());
-			dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
-			// Parallelism
-			int parallelism = taskState.getParallelism();
-			dos.writeInt(parallelism);
-
-			// Sub task states
-			dos.writeInt(taskState.getNumberCollectedStates());
-
-			for (int i = 0; i < parallelism; i++) {
-				SubtaskState subtaskState = taskState.getState(i);
-
-				if (subtaskState != null) {
-					dos.writeInt(i);
-
-					SerializedValue<?> serializedValue = subtaskState.getState();
-					if (serializedValue == null) {
-						dos.writeInt(-1); // null
-					} else {
-						byte[] serialized = serializedValue.getByteArray();
-						dos.writeInt(serialized.length);
-						dos.write(serialized, 0, serialized.length);
-					}
-
-					dos.writeLong(subtaskState.getStateSize());
-					dos.writeLong(subtaskState.getDuration());
-				}
-			}
-
-			// Key group states
-			dos.writeInt(taskState.getNumberCollectedKvStates());
-
-			for (int i = 0; i < parallelism; i++) {
-				KeyGroupState keyGroupState = taskState.getKvState(i);
-
-				if (keyGroupState != null) {
-					dos.write(i);
-
-					SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
-					if (serializedValue == null) {
-						dos.writeInt(-1); // null
-					} else {
-						byte[] serialized = serializedValue.getByteArray();
-						dos.writeInt(serialized.length);
-						dos.write(serialized, 0, serialized.length);
-					}
-
-					dos.writeLong(keyGroupState.getStateSize());
-					dos.writeLong(keyGroupState.getDuration());
-				}
-			}
-		}
-	}
-
-	@Override
-	public SavepointV0 deserialize(DataInputStream dis) throws IOException {
-		long checkpointId = dis.readLong();
-
-		// Task states
-		int numTaskStates = dis.readInt();
-		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int i = 0; i < numTaskStates; i++) {
-			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
-			int parallelism = dis.readInt();
-
-			// Add task state
-			TaskState taskState = new TaskState(jobVertexId, parallelism);
-			taskStates.add(taskState);
-
-			// Sub task states
-			int numSubTaskStates = dis.readInt();
-			for (int j = 0; j < numSubTaskStates; j++) {
-				int subtaskIndex = dis.readInt();
-
-				int length = dis.readInt();
-
-				SerializedValue<StateHandle<?>> serializedValue;
-				if (length == -1) {
-					serializedValue = new SerializedValue<>(null);
-				} else {
-					byte[] serializedData = new byte[length];
-					dis.readFully(serializedData, 0, length);
-					serializedValue = SerializedValue.fromBytes(serializedData);
-				}
-
-				long stateSize = dis.readLong();
-				long duration = dis.readLong();
-
-				SubtaskState subtaskState = new SubtaskState(
-						serializedValue,
-						stateSize,
-						duration);
-
-				taskState.putState(subtaskIndex, subtaskState);
-			}
-
-			// Key group states
-			int numKvStates = dis.readInt();
-			for (int j = 0; j < numKvStates; j++) {
-				int keyGroupIndex = dis.readInt();
-
-				int length = dis.readInt();
-
-				SerializedValue<StateHandle<?>> serializedValue;
-				if (length == -1) {
-					serializedValue = new SerializedValue<>(null);
-				} else {
-					byte[] serializedData = new byte[length];
-					dis.readFully(serializedData, 0, length);
-					serializedValue = SerializedValue.fromBytes(serializedData);
-				}
-
-				long stateSize = dis.readLong();
-				long duration = dis.readLong();
-
-				KeyGroupState keyGroupState = new KeyGroupState(
-						serializedValue,
-						stateSize,
-						duration);
-
-				taskState.putKvState(keyGroupIndex, keyGroupState);
-			}
-		}
-
-		return new SavepointV0(checkpointId, taskStates);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
index 24ab296..6b8c651 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
@@ -58,7 +58,7 @@ public class FsSavepointStoreTest {
 		assertEquals(0, tmp.getRoot().listFiles().length);
 
 		// Store
-		SavepointV0 stored = new SavepointV0(1929292, SavepointV01Test.createTaskStates(4, 24));
+		SavepointV0 stored = new SavepointV0(1929292, SavepointV0Test.createTaskStates(4, 24));
 		String path = store.storeSavepoint(stored);
 		assertEquals(1, tmp.getRoot().listFiles().length);
 
@@ -122,7 +122,7 @@ public class FsSavepointStoreTest {
 		assertEquals(1, tmp.getRoot().listFiles().length);
 
 		// Savepoint v0
-		Savepoint savepoint = new SavepointV0(checkpointId, SavepointV01Test.createTaskStates(4, 32));
+		Savepoint savepoint = new SavepointV0(checkpointId, SavepointV0Test.createTaskStates(4, 32));
 		String pathSavepoint = store.storeSavepoint(savepoint);
 		assertEquals(2, tmp.getRoot().listFiles().length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java
deleted file mode 100644
index 613bb61..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-
-import static org.junit.Assert.assertEquals;
-
-public class SavepointV01SerializerTest {
-
-	/**
-	 * Test serialization of {@link SavepointV0} instance.
-	 */
-	@Test
-	public void testSerializeDeserializeV1() throws Exception {
-		SavepointV0 expected = new SavepointV0(123123, SavepointV01Test.createTaskStates(8, 32));
-
-		SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE;
-
-		// Serialize
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
-		byte[] bytes = baos.toByteArray();
-
-		// Deserialize
-		ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-		Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java
deleted file mode 100644
index 1a66afb..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SavepointV01Test {
-
-	/**
-	 * Simple test of savepoint methods.
-	 */
-	@Test
-	public void testSavepointV1() throws Exception {
-		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
-		int numTaskStates = 4;
-		int numSubtaskStates = 16;
-
-		Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
-
-		SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
-		assertEquals(SavepointV0.VERSION, savepoint.getVersion());
-		assertEquals(checkpointId, savepoint.getCheckpointId());
-		assertEquals(expected, savepoint.getTaskStates());
-
-		assertFalse(savepoint.getTaskStates().isEmpty());
-		savepoint.dispose(ClassLoader.getSystemClassLoader());
-		assertTrue(savepoint.getTaskStates().isEmpty());
-	}
-
-	static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
-		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int i = 0; i < numTaskStates; i++) {
-			TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates);
-			for (int j = 0; j < numSubtaskStates; j++) {
-				SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
-						new CheckpointMessagesTest.MyHandle());
-
-				taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
-			}
-
-			taskStates.add(taskState);
-		}
-
-		return taskStates;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
new file mode 100644
index 0000000..b656d90
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class SavepointV0SerializerTest {
+
+	/**
+	 * Test serialization of {@link SavepointV0} instance.
+	 */
+	@Test
+	public void testSerializeDeserializeV1() throws Exception {
+		SavepointV0 expected = new SavepointV0(123123, SavepointV0Test.createTaskStates(8, 32));
+
+		SavepointV0Serializer serializer = SavepointV0Serializer.INSTANCE;
+
+		// Serialize
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
+		byte[] bytes = baos.toByteArray();
+
+		// Deserialize
+		ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+		Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
new file mode 100644
index 0000000..4d72c42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.CheckpointMessagesTest;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SavepointV0Test {
+
+	/**
+	 * Simple test of savepoint methods.
+	 */
+	@Test
+	public void testSavepointV0() throws Exception {
+		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
+		int numTaskStates = 4;
+		int numSubtaskStates = 16;
+
+		Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
+
+		SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
+
+		assertEquals(SavepointV0.VERSION, savepoint.getVersion());
+		assertEquals(checkpointId, savepoint.getCheckpointId());
+		assertEquals(expected, savepoint.getTaskStates());
+
+		assertFalse(savepoint.getTaskStates().isEmpty());
+		savepoint.dispose(ClassLoader.getSystemClassLoader());
+		assertTrue(savepoint.getTaskStates().isEmpty());
+	}
+
+	static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
+		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates);
+			for (int j = 0; j < numSubtaskStates; j++) {
+				SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
+						new CheckpointMessagesTest.MyHandle());
+
+				taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
+			}
+
+			taskStates.add(taskState);
+		}
+
+		return taskStates;
+	}
+
+}


[2/4] flink git commit: [hotfix] [distributed runtime] Add overflow check for ZooKeeper checkpoint ID counter.

Posted by se...@apache.org.
[hotfix] [distributed runtime] Add overflow check for ZooKeeper checkpoint ID counter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/862e7f0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/862e7f0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/862e7f0e

Branch: refs/heads/master
Commit: 862e7f0e9b11c8c218b0fe35fcdb192ea205e2fa
Parents: f1e9dae
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 5 19:16:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:27:10 2016 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/ZooKeeperCheckpointIDCounter.java    | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/862e7f0e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index c71eb7b..12839c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -131,8 +131,13 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
 			}
 
 			VersionedValue<Integer> current = sharedCount.getVersionedValue();
+			int newCount = current.getValue() + 1;
 
-			Integer newCount = current.getValue() + 1;
+			if (newCount < 0) {
+				// overflow and wrap around
+				throw new Exception("Checkpoint counter overflow. ZooKeeper checkpoint counter only supports " +
+						"checkpoints Ids up to " + Integer.MAX_VALUE);
+			}
 
 			if (sharedCount.trySetCount(current, newCount)) {
 				return current.getValue();
@@ -161,7 +166,7 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
 	 * Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link
 	 * ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper.
 	 */
-	private class SharedCountConnectionStateListener implements ConnectionStateListener {
+	private static class SharedCountConnectionStateListener implements ConnectionStateListener {
 
 		private volatile ConnectionState lastState;