You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/09 08:26:43 UTC
[1/4] flink git commit: [FLINK-4329] [distributed runtime] Clean up
ScheduleMode
Repository: flink
Updated Branches:
refs/heads/master f1e9daece -> 9a84b04f0
[FLINK-4329] [distributed runtime] Clean up ScheduleMode
Remove unsopported 'backtracking' option.
Pick better names for 'all' and 'from sources' options.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd98e85d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd98e85d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd98e85d
Branch: refs/heads/master
Commit: cd98e85ddd3c35e5900713266fc38916b53f172d
Parents: 862e7f0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 5 17:46:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:27:10 2016 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 9 +++------
.../apache/flink/runtime/jobgraph/JobGraph.java | 2 +-
.../apache/flink/runtime/jobgraph/JobStatus.java | 2 +-
.../flink/runtime/jobgraph/ScheduleMode.java | 18 +++++++-----------
.../runtime/jobmanager/JobManagerITCase.scala | 2 +-
.../api/graph/StreamingJobGraphGenerator.java | 2 +-
6 files changed, 14 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b778fa6..1a0301d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -183,7 +183,7 @@ public class ExecutionGraph {
/** The mode of scheduling. Decides how to select the initial set of tasks to be deployed.
* May indicate to deploy all sources, or to deploy everything, or to deploy via backtracking
* from results than need to be materialized. */
- private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
+ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
/** Flag to indicate whether the Graph has been archived */
private boolean isArchived = false;
@@ -717,7 +717,7 @@ public class ExecutionGraph {
switch (scheduleMode) {
- case FROM_SOURCES:
+ case LAZY_FROM_SOURCES:
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
@@ -726,15 +726,12 @@ public class ExecutionGraph {
}
break;
- case ALL:
+ case EAGER:
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
break;
- case BACKTRACKING:
- // go back from vertices that need computation to the ones we need to run
- throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
default:
throw new JobException("Schedule mode is invalid.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f825d5b..942f1e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -91,7 +91,7 @@ public class JobGraph implements Serializable {
private boolean allowQueuedScheduling;
/** The mode in which the job is scheduled */
- private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
+ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
/** The settings for asynchronous snapshots */
private JobSnapshottingSettings snapshotSettings;
http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 4ae566d..236a217 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -55,7 +55,7 @@ public enum JobStatus {
// --------------------------------------------------------------------------------------------
- enum TerminalState {
+ private enum TerminalState {
NON_TERMINAL,
LOCALLY,
GLOBALLY
http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
index 330519d..9405067 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
@@ -18,18 +18,14 @@
package org.apache.flink.runtime.jobgraph;
+/**
+ * The ScheduleMode decides how tasks of an execution graph are started.
+ */
public enum ScheduleMode {
- /**
- * Schedule tasks from sources to sinks with lazy deployment of receiving tasks.
- */
- FROM_SOURCES,
-
- BACKTRACKING,
-
- /**
- * Schedule tasks all at once instead of lazy deployment of receiving tasks.
- */
- ALL
+ /** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */
+ LAZY_FROM_SOURCES,
+ /** Schedules all tasks immediately. */
+ EAGER
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0c5534a..a576a58 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -343,7 +343,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver)
- jobGraph.setScheduleMode(ScheduleMode.ALL)
+ jobGraph.setScheduleMode(ScheduleMode.EAGER)
val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
val jmGateway = cluster.getLeaderGateway(1 seconds)
http://git-wip-us.apache.org/repos/asf/flink/blob/cd98e85d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 3abecc1..28d982c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -113,7 +113,7 @@ public class StreamingJobGraphGenerator {
jobGraph = new JobGraph(streamGraph.getJobName());
// make sure that all vertices start immediately
- jobGraph.setScheduleMode(ScheduleMode.ALL);
+ jobGraph.setScheduleMode(ScheduleMode.EAGER);
init();
[3/4] flink git commit: [FLINK-4332] [checkpoints] Fix
SavepointV1Serializer read() / readFully()
Posted by se...@apache.org.
[FLINK-4332] [checkpoints] Fix SavepointV1Serializer read() / readFully()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f3bab10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f3bab10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f3bab10
Branch: refs/heads/master
Commit: 3f3bab10b9ca68eb31a7ef5a31e49145b51006fd
Parents: cd98e85
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 8 19:16:23 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:28:31 2016 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/savepoint/SavepointV1Serializer.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3f3bab10/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 8de29a6..fcdc2ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -138,7 +138,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV0> {
serializedValue = new SerializedValue<>(null);
} else {
byte[] serializedData = new byte[length];
- dis.read(serializedData, 0, length);
+ dis.readFully(serializedData, 0, length);
serializedValue = SerializedValue.fromBytes(serializedData);
}
@@ -165,7 +165,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV0> {
serializedValue = new SerializedValue<>(null);
} else {
byte[] serializedData = new byte[length];
- dis.read(serializedData, 0, length);
+ dis.readFully(serializedData, 0, length);
serializedValue = SerializedValue.fromBytes(serializedData);
}
[4/4] flink git commit: [FLINK-4333] [checkpoints] Clean up naming
mixups for SavepointV0 / SavepointV1 / SavepointV01
Posted by se...@apache.org.
[FLINK-4333] [checkpoints] Clean up naming mixups for SavepointV0 / SavepointV1 / SavepointV01
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a84b04f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a84b04f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a84b04f
Branch: refs/heads/master
Commit: 9a84b04f076f9cdc2fd0037fcc89f31edc596bdd
Parents: 3f3bab1
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 8 19:22:11 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:28:48 2016 +0200
----------------------------------------------------------------------
.../savepoint/SavepointSerializers.java | 2 +-
.../savepoint/SavepointV0Serializer.java | 186 +++++++++++++++++++
.../savepoint/SavepointV1Serializer.java | 186 -------------------
.../savepoint/FsSavepointStoreTest.java | 4 +-
.../savepoint/SavepointV01SerializerTest.java | 52 ------
.../checkpoint/savepoint/SavepointV01Test.java | 81 --------
.../savepoint/SavepointV0SerializerTest.java | 52 ++++++
.../checkpoint/savepoint/SavepointV0Test.java | 81 ++++++++
8 files changed, 322 insertions(+), 322 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index cc5c208..d06f3d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -31,7 +31,7 @@ public class SavepointSerializers {
private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(1);
static {
- SERIALIZERS.put(SavepointV0.VERSION, SavepointV1Serializer.INSTANCE);
+ SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
new file mode 100644
index 0000000..e82b85f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.KeyGroupState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Serializer for {@link SavepointV0} instances.
+ *
+ * <p>In contrast to previous savepoint versions, this serializer makes sure
+ * that no default Java serialization is used for serialization. Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
+
+ public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
+
+ private SavepointV0Serializer() {
+ }
+
+ @Override
+ public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
+ dos.writeLong(savepoint.getCheckpointId());
+
+ Collection<TaskState> taskStates = savepoint.getTaskStates();
+ dos.writeInt(taskStates.size());
+
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ // Vertex ID
+ dos.writeLong(taskState.getJobVertexID().getLowerPart());
+ dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+ // Parallelism
+ int parallelism = taskState.getParallelism();
+ dos.writeInt(parallelism);
+
+ // Sub task states
+ dos.writeInt(taskState.getNumberCollectedStates());
+
+ for (int i = 0; i < parallelism; i++) {
+ SubtaskState subtaskState = taskState.getState(i);
+
+ if (subtaskState != null) {
+ dos.writeInt(i);
+
+ SerializedValue<?> serializedValue = subtaskState.getState();
+ if (serializedValue == null) {
+ dos.writeInt(-1); // null
+ } else {
+ byte[] serialized = serializedValue.getByteArray();
+ dos.writeInt(serialized.length);
+ dos.write(serialized, 0, serialized.length);
+ }
+
+ dos.writeLong(subtaskState.getStateSize());
+ dos.writeLong(subtaskState.getDuration());
+ }
+ }
+
+ // Key group states
+ dos.writeInt(taskState.getNumberCollectedKvStates());
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyGroupState keyGroupState = taskState.getKvState(i);
+
+ if (keyGroupState != null) {
+ dos.write(i);
+
+ SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
+ if (serializedValue == null) {
+ dos.writeInt(-1); // null
+ } else {
+ byte[] serialized = serializedValue.getByteArray();
+ dos.writeInt(serialized.length);
+ dos.write(serialized, 0, serialized.length);
+ }
+
+ dos.writeLong(keyGroupState.getStateSize());
+ dos.writeLong(keyGroupState.getDuration());
+ }
+ }
+ }
+ }
+
+ @Override
+ public SavepointV0 deserialize(DataInputStream dis) throws IOException {
+ long checkpointId = dis.readLong();
+
+ // Task states
+ int numTaskStates = dis.readInt();
+ List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+ for (int i = 0; i < numTaskStates; i++) {
+ JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+ int parallelism = dis.readInt();
+
+ // Add task state
+ TaskState taskState = new TaskState(jobVertexId, parallelism);
+ taskStates.add(taskState);
+
+ // Sub task states
+ int numSubTaskStates = dis.readInt();
+ for (int j = 0; j < numSubTaskStates; j++) {
+ int subtaskIndex = dis.readInt();
+
+ int length = dis.readInt();
+
+ SerializedValue<StateHandle<?>> serializedValue;
+ if (length == -1) {
+ serializedValue = new SerializedValue<>(null);
+ } else {
+ byte[] serializedData = new byte[length];
+ dis.readFully(serializedData, 0, length);
+ serializedValue = SerializedValue.fromBytes(serializedData);
+ }
+
+ long stateSize = dis.readLong();
+ long duration = dis.readLong();
+
+ SubtaskState subtaskState = new SubtaskState(
+ serializedValue,
+ stateSize,
+ duration);
+
+ taskState.putState(subtaskIndex, subtaskState);
+ }
+
+ // Key group states
+ int numKvStates = dis.readInt();
+ for (int j = 0; j < numKvStates; j++) {
+ int keyGroupIndex = dis.readInt();
+
+ int length = dis.readInt();
+
+ SerializedValue<StateHandle<?>> serializedValue;
+ if (length == -1) {
+ serializedValue = new SerializedValue<>(null);
+ } else {
+ byte[] serializedData = new byte[length];
+ dis.readFully(serializedData, 0, length);
+ serializedValue = SerializedValue.fromBytes(serializedData);
+ }
+
+ long stateSize = dis.readLong();
+ long duration = dis.readLong();
+
+ KeyGroupState keyGroupState = new KeyGroupState(
+ serializedValue,
+ stateSize,
+ duration);
+
+ taskState.putKvState(keyGroupIndex, keyGroupState);
+ }
+ }
+
+ return new SavepointV0(checkpointId, taskStates);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
deleted file mode 100644
index fcdc2ca..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.KeyGroupState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Serializer for {@link SavepointV0} instances.
- *
- * <p>In contrast to previous savepoint versions, this serializer makes sure
- * that no default Java serialization is used for serialization. Therefore, we
- * don't rely on any involved Java classes to stay the same.
- */
-class SavepointV1Serializer implements SavepointSerializer<SavepointV0> {
-
- public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
-
- private SavepointV1Serializer() {
- }
-
- @Override
- public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
- dos.writeLong(savepoint.getCheckpointId());
-
- Collection<TaskState> taskStates = savepoint.getTaskStates();
- dos.writeInt(taskStates.size());
-
- for (TaskState taskState : savepoint.getTaskStates()) {
- // Vertex ID
- dos.writeLong(taskState.getJobVertexID().getLowerPart());
- dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
- // Parallelism
- int parallelism = taskState.getParallelism();
- dos.writeInt(parallelism);
-
- // Sub task states
- dos.writeInt(taskState.getNumberCollectedStates());
-
- for (int i = 0; i < parallelism; i++) {
- SubtaskState subtaskState = taskState.getState(i);
-
- if (subtaskState != null) {
- dos.writeInt(i);
-
- SerializedValue<?> serializedValue = subtaskState.getState();
- if (serializedValue == null) {
- dos.writeInt(-1); // null
- } else {
- byte[] serialized = serializedValue.getByteArray();
- dos.writeInt(serialized.length);
- dos.write(serialized, 0, serialized.length);
- }
-
- dos.writeLong(subtaskState.getStateSize());
- dos.writeLong(subtaskState.getDuration());
- }
- }
-
- // Key group states
- dos.writeInt(taskState.getNumberCollectedKvStates());
-
- for (int i = 0; i < parallelism; i++) {
- KeyGroupState keyGroupState = taskState.getKvState(i);
-
- if (keyGroupState != null) {
- dos.write(i);
-
- SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
- if (serializedValue == null) {
- dos.writeInt(-1); // null
- } else {
- byte[] serialized = serializedValue.getByteArray();
- dos.writeInt(serialized.length);
- dos.write(serialized, 0, serialized.length);
- }
-
- dos.writeLong(keyGroupState.getStateSize());
- dos.writeLong(keyGroupState.getDuration());
- }
- }
- }
- }
-
- @Override
- public SavepointV0 deserialize(DataInputStream dis) throws IOException {
- long checkpointId = dis.readLong();
-
- // Task states
- int numTaskStates = dis.readInt();
- List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
- for (int i = 0; i < numTaskStates; i++) {
- JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
- int parallelism = dis.readInt();
-
- // Add task state
- TaskState taskState = new TaskState(jobVertexId, parallelism);
- taskStates.add(taskState);
-
- // Sub task states
- int numSubTaskStates = dis.readInt();
- for (int j = 0; j < numSubTaskStates; j++) {
- int subtaskIndex = dis.readInt();
-
- int length = dis.readInt();
-
- SerializedValue<StateHandle<?>> serializedValue;
- if (length == -1) {
- serializedValue = new SerializedValue<>(null);
- } else {
- byte[] serializedData = new byte[length];
- dis.readFully(serializedData, 0, length);
- serializedValue = SerializedValue.fromBytes(serializedData);
- }
-
- long stateSize = dis.readLong();
- long duration = dis.readLong();
-
- SubtaskState subtaskState = new SubtaskState(
- serializedValue,
- stateSize,
- duration);
-
- taskState.putState(subtaskIndex, subtaskState);
- }
-
- // Key group states
- int numKvStates = dis.readInt();
- for (int j = 0; j < numKvStates; j++) {
- int keyGroupIndex = dis.readInt();
-
- int length = dis.readInt();
-
- SerializedValue<StateHandle<?>> serializedValue;
- if (length == -1) {
- serializedValue = new SerializedValue<>(null);
- } else {
- byte[] serializedData = new byte[length];
- dis.readFully(serializedData, 0, length);
- serializedValue = SerializedValue.fromBytes(serializedData);
- }
-
- long stateSize = dis.readLong();
- long duration = dis.readLong();
-
- KeyGroupState keyGroupState = new KeyGroupState(
- serializedValue,
- stateSize,
- duration);
-
- taskState.putKvState(keyGroupIndex, keyGroupState);
- }
- }
-
- return new SavepointV0(checkpointId, taskStates);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
index 24ab296..6b8c651 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
@@ -58,7 +58,7 @@ public class FsSavepointStoreTest {
assertEquals(0, tmp.getRoot().listFiles().length);
// Store
- SavepointV0 stored = new SavepointV0(1929292, SavepointV01Test.createTaskStates(4, 24));
+ SavepointV0 stored = new SavepointV0(1929292, SavepointV0Test.createTaskStates(4, 24));
String path = store.storeSavepoint(stored);
assertEquals(1, tmp.getRoot().listFiles().length);
@@ -122,7 +122,7 @@ public class FsSavepointStoreTest {
assertEquals(1, tmp.getRoot().listFiles().length);
// Savepoint v0
- Savepoint savepoint = new SavepointV0(checkpointId, SavepointV01Test.createTaskStates(4, 32));
+ Savepoint savepoint = new SavepointV0(checkpointId, SavepointV0Test.createTaskStates(4, 32));
String pathSavepoint = store.storeSavepoint(savepoint);
assertEquals(2, tmp.getRoot().listFiles().length);
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java
deleted file mode 100644
index 613bb61..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01SerializerTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-
-import static org.junit.Assert.assertEquals;
-
-public class SavepointV01SerializerTest {
-
- /**
- * Test serialization of {@link SavepointV0} instance.
- */
- @Test
- public void testSerializeDeserializeV1() throws Exception {
- SavepointV0 expected = new SavepointV0(123123, SavepointV01Test.createTaskStates(8, 32));
-
- SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE;
-
- // Serialize
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
- byte[] bytes = baos.toByteArray();
-
- // Deserialize
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java
deleted file mode 100644
index 1a66afb..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV01Test.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SavepointV01Test {
-
- /**
- * Simple test of savepoint methods.
- */
- @Test
- public void testSavepointV1() throws Exception {
- long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
- int numTaskStates = 4;
- int numSubtaskStates = 16;
-
- Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
-
- SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
- assertEquals(SavepointV0.VERSION, savepoint.getVersion());
- assertEquals(checkpointId, savepoint.getCheckpointId());
- assertEquals(expected, savepoint.getTaskStates());
-
- assertFalse(savepoint.getTaskStates().isEmpty());
- savepoint.dispose(ClassLoader.getSystemClassLoader());
- assertTrue(savepoint.getTaskStates().isEmpty());
- }
-
- static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
- List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
- for (int i = 0; i < numTaskStates; i++) {
- TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates);
- for (int j = 0; j < numSubtaskStates; j++) {
- SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
- new CheckpointMessagesTest.MyHandle());
-
- taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
- }
-
- taskStates.add(taskState);
- }
-
- return taskStates;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
new file mode 100644
index 0000000..b656d90
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class SavepointV0SerializerTest {
+
+ /**
+ * Test serialization of {@link SavepointV0} instance.
+ */
+ @Test
+ public void testSerializeDeserializeV1() throws Exception {
+ SavepointV0 expected = new SavepointV0(123123, SavepointV0Test.createTaskStates(8, 32));
+
+ SavepointV0Serializer serializer = SavepointV0Serializer.INSTANCE;
+
+ // Serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
+ byte[] bytes = baos.toByteArray();
+
+ // Deserialize
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+
+ assertEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a84b04f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
new file mode 100644
index 0000000..4d72c42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.CheckpointMessagesTest;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SavepointV0Test {
+
+ /**
+ * Simple test of savepoint methods.
+ */
+ @Test
+ public void testSavepointV0() throws Exception {
+ long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
+ int numTaskStates = 4;
+ int numSubtaskStates = 16;
+
+ Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
+
+ SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
+
+ assertEquals(SavepointV0.VERSION, savepoint.getVersion());
+ assertEquals(checkpointId, savepoint.getCheckpointId());
+ assertEquals(expected, savepoint.getTaskStates());
+
+ assertFalse(savepoint.getTaskStates().isEmpty());
+ savepoint.dispose(ClassLoader.getSystemClassLoader());
+ assertTrue(savepoint.getTaskStates().isEmpty());
+ }
+
+ static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
+ List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+ for (int i = 0; i < numTaskStates; i++) {
+ TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates);
+ for (int j = 0; j < numSubtaskStates; j++) {
+ SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
+ new CheckpointMessagesTest.MyHandle());
+
+ taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
+ }
+
+ taskStates.add(taskState);
+ }
+
+ return taskStates;
+ }
+
+}
[2/4] flink git commit: [hotfix] [distributed runtime] Add overflow
check for ZooKeeper checkpoint ID counter.
Posted by se...@apache.org.
[hotfix] [distributed runtime] Add overflow check for ZooKeeper checkpoint ID counter.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/862e7f0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/862e7f0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/862e7f0e
Branch: refs/heads/master
Commit: 862e7f0e9b11c8c218b0fe35fcdb192ea205e2fa
Parents: f1e9dae
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 5 19:16:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 8 19:27:10 2016 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/ZooKeeperCheckpointIDCounter.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/862e7f0e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index c71eb7b..12839c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -131,8 +131,13 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
}
VersionedValue<Integer> current = sharedCount.getVersionedValue();
+ int newCount = current.getValue() + 1;
- Integer newCount = current.getValue() + 1;
+ if (newCount < 0) {
+ // overflow and wrap around
+ throw new Exception("Checkpoint counter overflow. ZooKeeper checkpoint counter only supports " +
+ "checkpoints Ids up to " + Integer.MAX_VALUE);
+ }
if (sharedCount.trySetCount(current, newCount)) {
return current.getValue();
@@ -161,7 +166,7 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
* Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link
* ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper.
*/
- private class SharedCountConnectionStateListener implements ConnectionStateListener {
+ private static class SharedCountConnectionStateListener implements ConnectionStateListener {
private volatile ConnectionState lastState;