You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/26 16:06:04 UTC
flink git commit: [FLINK-4684] [checkpoints] Remove redundant class
loader from CheckpointCoordinator
Repository: flink
Updated Branches:
refs/heads/master 8fa313c39 -> 70e71c161
[FLINK-4684] [checkpoints] Remove redundant class loader from CheckpointCoordinator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70e71c16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70e71c16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70e71c16
Branch: refs/heads/master
Commit: 70e71c16177b40c2418e6a8ca0838bf117f6a926
Parents: 8fa313c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 26 12:32:10 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 26 18:05:01 2016 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 5 -
.../flink/runtime/checkpoint/TaskState.java | 2 +-
.../runtime/executiongraph/ExecutionGraph.java | 1 -
.../apache/flink/runtime/state/StateUtil.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 101 +++++++------------
.../checkpoint/CheckpointStateRestoreTest.java | 7 +-
6 files changed, 38 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fc40911..6a43ddf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -112,9 +112,6 @@ public class CheckpointCoordinator {
* need to be ascending across job managers. */
private final CheckpointIDCounter checkpointIdCounter;
- /** Class loader used to deserialize the state handles (as they may be user-defined) */
- private final ClassLoader userClassLoader;
-
/** The base checkpoint interval. Actual trigger time may be affected by the
* max concurrent checkpoints and minimum-pause values */
private final long baseInterval;
@@ -167,7 +164,6 @@ public class CheckpointCoordinator {
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
- ClassLoader userClassLoader,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
SavepointStore savepointStore,
@@ -198,7 +194,6 @@ public class CheckpointCoordinator {
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.savepointStore = checkNotNull(savepointStore);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
- this.userClassLoader = checkNotNull(userClassLoader);
this.statsTracker = checkNotNull(statsTracker);
this.timer = new Timer("Checkpoint Timer", true);
http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 657dd60..f5e3618 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -62,7 +62,7 @@ public class TaskState implements StateObject {
"Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");
this.jobVertexID = jobVertexID;
- //preallocate lists of the required size, so that we can randomly set values to indexes
+
this.subtaskStates = new HashMap<>(parallelism);
this.keyGroupsStateHandles = new HashMap<>(parallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c3cf297..7c3fa0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -390,7 +390,6 @@ public class ExecutionGraph {
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
- userClassLoader,
checkpointIDCounter,
checkpointStore,
savepointStore,
http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index 3c5157e..aa28404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -69,7 +69,7 @@ public class StateUtil {
* occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception.
*
* @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values.
- * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
+ * @throws IOException exception that is a collection of all suppressed exceptions that were caught during iteration
*/
public static void bestEffortCloseAllStateObjects(
Iterable<? extends StateObject> handlesToDiscard) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index bc61742..9adaa86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -42,10 +42,13 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
+
import org.junit.Assert;
import org.junit.Test;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
@@ -109,7 +112,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -162,7 +164,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -213,7 +214,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -265,7 +265,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -390,7 +389,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -511,7 +509,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -662,7 +659,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -798,7 +794,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10, cl),
new HeapSavepointStore(),
@@ -920,7 +915,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -989,7 +983,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -1069,7 +1062,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -1161,7 +1153,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -1246,7 +1237,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -1384,7 +1374,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
- cl,
counter,
new StandaloneCompletedCheckpointStore(10, cl),
new HeapSavepointStore(),
@@ -1470,8 +1459,9 @@ public class CheckpointCoordinatorTest {
maxConcurrentAttempts,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
- new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
- (), new StandaloneCompletedCheckpointStore(2, cl),
+ new ExecutionVertex[] { commitVertex },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
new DisabledCheckpointStatsTracker());
@@ -1541,8 +1531,9 @@ public class CheckpointCoordinatorTest {
maxConcurrentAttempts, // max two concurrent checkpoints
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
- new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
- (), new StandaloneCompletedCheckpointStore(2, cl),
+ new ExecutionVertex[] { commitVertex },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
new DisabledCheckpointStatsTracker());
@@ -1621,7 +1612,8 @@ public class CheckpointCoordinatorTest {
2, // max two concurrent checkpoints
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
- new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
+ new ExecutionVertex[] { commitVertex },
+ new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
new DisabledCheckpointStatsTracker());
@@ -1672,7 +1664,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
- cl,
checkpointIDCounter,
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -1723,7 +1714,6 @@ public class CheckpointCoordinatorTest {
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2, cl),
new HeapSavepointStore(),
@@ -1772,7 +1762,8 @@ public class CheckpointCoordinatorTest {
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
- ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+ ExecutionVertex[] arrayExecutionVertices =
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -1784,7 +1775,6 @@ public class CheckpointCoordinatorTest {
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -1796,8 +1786,8 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
- List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
- List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> nonPartitionedState = generateStateForVertex(jobVertexID1, index);
@@ -1876,7 +1866,7 @@ public class CheckpointCoordinatorTest {
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
- ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+ ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -1888,7 +1878,6 @@ public class CheckpointCoordinatorTest {
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -1900,8 +1889,8 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
- List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
- List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ List<KeyGroupRange> keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ List<KeyGroupRange> keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
@@ -1991,7 +1980,8 @@ public class CheckpointCoordinatorTest {
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
- ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+ ExecutionVertex[] arrayExecutionVertices =
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2003,7 +1993,6 @@ public class CheckpointCoordinatorTest {
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -2015,8 +2004,10 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
- List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
- List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ List<KeyGroupRange> keyGroupPartitions1 =
+ CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ List<KeyGroupRange> keyGroupPartitions2 =
+ CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
@@ -2110,7 +2101,8 @@ public class CheckpointCoordinatorTest {
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
- ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]);
+ ExecutionVertex[] arrayExecutionVertices =
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2122,7 +2114,6 @@ public class CheckpointCoordinatorTest {
arrayExecutionVertices,
arrayExecutionVertices,
arrayExecutionVertices,
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -2134,8 +2125,10 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
- List<KeyGroupRange> keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
- List<KeyGroupRange> keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ List<KeyGroupRange> keyGroupPartitions1 =
+ CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ List<KeyGroupRange> keyGroupPartitions2 =
+ CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
@@ -2173,7 +2166,8 @@ public class CheckpointCoordinatorTest {
int newParallelism2 = 13;
- List<KeyGroupRange> newKeyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+ List<KeyGroupRange> newKeyGroupPartitions2 =
+ CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, newParallelism2);
final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
jobVertexID1,
@@ -2207,37 +2201,12 @@ public class CheckpointCoordinatorTest {
// Utilities
// ------------------------------------------------------------------------
- static void sendAckMessageToCoordinator(
- CheckpointCoordinator coord,
- long checkpointId, JobID jid,
- ExecutionJobVertex jobVertex,
- JobVertexID jobVertexID,
- List<KeyGroupRange> keyGroupPartitions) throws Exception {
-
- for (int index = 0; index < jobVertex.getParallelism(); index++) {
- ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID, index);
- List<KeyGroupsStateHandle> keyGroupState = generateKeyGroupState(
- jobVertexID,
- keyGroupPartitions.get(index));
-
- AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- state,
- keyGroupState);
-
- coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
- }
- }
-
public static List<KeyGroupsStateHandle> generateKeyGroupState(
JobVertexID jobVertexID,
KeyGroupRange keyGroupPartition) throws IOException {
- KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupPartition);
List<Integer> testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
- int runningGroupsOffset = 0;
+
// generate state for one keygroup
for (int keyGroupIndex : keyGroupPartition) {
Random random = new Random(jobVertexID.hashCode() + keyGroupIndex);
@@ -2270,8 +2239,7 @@ public class CheckpointCoordinatorTest {
//write all generated values in a single byte array, which is index by groupOffsetsInFinalByteArray
byte[] allSerializedValuesConcatenated = new byte[runningGroupsOffset];
runningGroupsOffset = 0;
- byte[] old = null;
- for(byte[] serializedGroupValue : serializedGroupValues) {
+ for (byte[] serializedGroupValue : serializedGroupValues) {
System.arraycopy(
serializedGroupValue,
0,
@@ -2279,7 +2247,6 @@ public class CheckpointCoordinatorTest {
runningGroupsOffset,
serializedGroupValue.length);
runningGroupsOffset += serializedGroupValue.length;
- old = serializedGroupValue;
}
ByteStreamStateHandle allSerializedStatesHandle = new ByteStreamStateHandle(
http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 5416292..a4896aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -32,12 +32,11 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
-
import org.apache.flink.runtime.util.SerializableObject;
+
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -101,7 +100,6 @@ public class CheckpointStateRestoreTest {
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0],
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -182,7 +180,6 @@ public class CheckpointStateRestoreTest {
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0],
- cl,
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),
@@ -231,7 +228,7 @@ public class CheckpointStateRestoreTest {
Integer.MAX_VALUE,
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[] { mock(ExecutionVertex.class) },
- new ExecutionVertex[0], cl,
+ new ExecutionVertex[0],
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, cl),
new HeapSavepointStore(),