You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/28 13:42:53 UTC
[1/2] flink git commit: [FLINK-7268] [checkpoints] Scope
SharedStateRegistry objects per (re)start
Repository: flink
Updated Branches:
refs/heads/release-1.3 0225db288 -> 09caa9ffd
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 77423c2..dc2b11e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -18,13 +18,14 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+
+import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Before;
@@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
// Recover
- sharedStateRegistry.clear();
- checkpoints.recover(sharedStateRegistry);
+ sharedStateRegistry.close();
+ sharedStateRegistry = new SharedStateRegistry();
+ checkpoints.recover();
assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals(0, store.getNumberOfRetainedCheckpoints());
assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
- sharedStateRegistry.clear();
- store.recover(sharedStateRegistry);
+ sharedStateRegistry.close();
+ store.recover();
assertEquals(0, store.getNumberOfRetainedCheckpoints());
}
@@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
// Recover again
- sharedStateRegistry.clear();
- store.recover(sharedStateRegistry);
+ sharedStateRegistry.close();
+ store.recover();
CompletedCheckpoint recovered = store.getLatestCheckpoint();
assertEquals(checkpoint, recovered);
@@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
checkpointStore.addCheckpoint(checkpoint);
}
- sharedStateRegistry.clear();
- checkpointStore.recover(sharedStateRegistry);
+ sharedStateRegistry.close();
+ checkpointStore.recover();
CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
@@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
zkCheckpointStore1.addCheckpoint(completedCheckpoint);
// recover the checkpoint by a different checkpoint store
- sharedStateRegistry.clear();
- zkCheckpointStore2.recover(sharedStateRegistry);
+ sharedStateRegistry.close();
+ sharedStateRegistry = new SharedStateRegistry();
+ zkCheckpointStore2.recover();
CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 91bab85..3171f1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
stateStorage,
Executors.directExecutor());
- SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
- zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
-
- verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
- verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+ zooKeeperCompletedCheckpointStore.recover();
CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index c1b3ccd..9f6f88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -19,12 +19,15 @@
package org.apache.flink.runtime.state;
import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+
import org.junit.Test;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.spy;
@@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest {
@Test
public void testSharedStateDeRegistration() throws Exception {
- Random rnd = new Random(42);
-
SharedStateRegistry registry = spy(new SharedStateRegistry());
// Create two state handles with overlapping shared state
@@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest {
verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
}
+ /**
+ * This tests that re-registration of shared state with another registry works as expected. This simulates a
+ * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state registry and re-registers
+ * all live checkpoint states.
+ */
+ @Test
+ public void testSharedStateReRegistration() throws Exception {
+
+ SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry());
+
+ IncrementalKeyedStateHandle stateHandleX = create(new Random(1));
+ IncrementalKeyedStateHandle stateHandleY = create(new Random(2));
+ IncrementalKeyedStateHandle stateHandleZ = create(new Random(3));
+
+ // Now we register first time ...
+ stateHandleX.registerSharedStates(stateRegistryA);
+ stateHandleY.registerSharedStates(stateRegistryA);
+ stateHandleZ.registerSharedStates(stateRegistryA);
+
+ try {
+ // Second attempt should fail
+ stateHandleX.registerSharedStates(stateRegistryA);
+ fail("Should not be able to register twice with the same registry.");
+ } catch (IllegalStateException ignore) {
+ }
+
+ // Everything should be discarded for this handle
+ stateHandleZ.discardState();
+ verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState();
+ for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) {
+ verify(stateHandle, times(1)).discardState();
+ }
+
+ // Close the first registry
+ stateRegistryA.close();
+
+ // Attempt to register to closed registry should trigger exception
+ try {
+ create(new Random(4)).registerSharedStates(stateRegistryA);
+ fail("Should not be able to register new state to closed registry.");
+ } catch (IllegalStateException ignore) {
+ }
+
+ // All state should still get discarded
+ stateHandleY.discardState();
+ verify(stateHandleY.getMetaStateHandle(), times(1)).discardState();
+ for (StreamStateHandle stateHandle : stateHandleY.getSharedState().values()) {
+ verify(stateHandle, times(1)).discardState();
+ }
+
+ // This should still be unaffected
+ verify(stateHandleX.getMetaStateHandle(), never()).discardState();
+ for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+ verify(stateHandle, never()).discardState();
+ }
+
+ // We re-register the handle with a new registry
+ SharedStateRegistry sharedStateRegistryB = spy(new SharedStateRegistry());
+ stateHandleX.registerSharedStates(sharedStateRegistryB);
+ stateHandleX.discardState();
+
+ // Should be completely discarded because it is tracked through the new registry
+ verify(stateHandleX.getMetaStateHandle(), times(1)).discardState();
+ for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+ verify(stateHandle, times(1)).discardState();
+ }
+
+ sharedStateRegistryB.close();
+ }
+
private static IncrementalKeyedStateHandle create(Random rnd) {
return new IncrementalKeyedStateHandle(
UUID.nameUUIDFromBytes("test".getBytes()),
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index a0c4412..037ecd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
+ private final int maxRetainedCheckpoints;
+
+ public RecoverableCompletedCheckpointStore() {
+ this(1);
+ }
+
+ public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+ Preconditions.checkArgument(maxRetainedCheckpoints > 0);
+ this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+ }
+
@Override
- public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public void recover() throws Exception {
checkpoints.addAll(suspended);
suspended.clear();
-
- for (CompletedCheckpoint checkpoint : checkpoints) {
- checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
- }
}
@Override
@@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
checkpoints.addLast(checkpoint);
-
- if (checkpoints.size() > 1) {
- CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
- checkpointToSubsume.discardOnSubsume();
+ if (checkpoints.size() > maxRetainedCheckpoints) {
+ removeOldestCheckpoint();
}
}
+ public void removeOldestCheckpoint() throws Exception {
+ CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
+ checkpointToSubsume.discardOnSubsume();
+ }
+
@Override
public CompletedCheckpoint getLatestCheckpoint() throws Exception {
return checkpoints.isEmpty() ? null : checkpoints.getLast();
@@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
@Override
public int getMaxNumberOfRetainedCheckpoints() {
- return 1;
+ return maxRetainedCheckpoints;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3ce5a14..fb5f0e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -837,7 +837,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// Utilities
// ------------------------------------------------------------------------
-
@Override
public String toString() {
return getName();
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 6ad7708..7c38d8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,9 +27,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointListener;
@@ -47,19 +50,25 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -84,22 +93,35 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
private static TestStreamEnvironment env;
+ private static TestingServer zkServer;
+
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
private StateBackendEnum stateBackendEnum;
private AbstractStateBackend stateBackend;
- AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) {
+ AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) throws IOException {
this.stateBackendEnum = stateBackendEnum;
}
enum StateBackendEnum {
- MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, FILE_ASYNC
+ MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
}
- @BeforeClass
- public static void startTestCluster() {
+ @Before
+ public void startTestCluster() throws Exception {
+
+ // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
+ if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+ zkServer = new TestingServer();
+ zkServer.start();
+ }
+
+ TemporaryFolder temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ final File haDir = temporaryFolder.newFolder();
+
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
@@ -107,21 +129,28 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
- cluster = new LocalFlinkMiniCluster(config, false);
+ if (zkServer != null) {
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+ }
+
+ // purposefully delay in the executor to tease out races
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
+ HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+ config,
+ new Executor() {
+ @Override
+ public void execute(Runnable command) {
+ executor.schedule(command, 500, MILLISECONDS);
+ }
+ });
+
+ cluster = new LocalFlinkMiniCluster(config, haServices, false);
cluster.start();
env = new TestStreamEnvironment(cluster, PARALLELISM);
- }
-
- @AfterClass
- public static void stopTestCluster() {
- if (cluster != null) {
- cluster.stop();
- }
- }
- @Before
- public void initStateBackend() throws IOException {
switch (stateBackendEnum) {
case MEM:
this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
@@ -146,7 +175,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
this.stateBackend = rdb;
break;
}
- case ROCKSDB_INCREMENTAL: {
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK: {
String rocksDb = tempFolder.newFolder().getAbsolutePath();
String backups = tempFolder.newFolder().getAbsolutePath();
// we use the fs backend with small threshold here to test the behaviour with file
@@ -160,7 +190,21 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
this.stateBackend = rdb;
break;
}
+ default:
+ throw new IllegalStateException("No backend selected.");
+ }
+ }
+
+ @After
+ public void stopTestCluster() throws IOException {
+ if (cluster != null) {
+ cluster.stop();
+ cluster = null;
+ }
+ if (zkServer != null) {
+ zkServer.stop();
+ zkServer = null;
}
}
@@ -172,7 +216,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
final int WINDOW_SIZE = windowSize();
final int NUM_KEYS = numKeys();
FailingSource.reset();
-
+
try {
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -506,7 +550,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
}
-
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@@ -667,7 +710,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
Integer curr = windowCounts.get(value.f0);
if (curr != null) {
windowCounts.put(value.f0, curr + 1);
@@ -754,7 +796,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
windowCounts.put(value.f0, 1);
}
-
// verify the contents of that window, the contents should be:
// (key + num windows so far)
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
index a5bf10c..9abbddd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
package org.apache.flink.test.checkpointing;
+import java.io.IOException;
+
public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+ public AsyncFileBackendEventTimeWindowCheckpointingITCase() throws IOException {
super(StateBackendEnum.FILE_ASYNC);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
index ef9ad37..62041a5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
package org.apache.flink.test.checkpointing;
+import java.io.IOException;
+
public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+ public AsyncMemBackendEventTimeWindowCheckpointingITCase() throws IOException {
super(StateBackendEnum.MEM_ASYNC);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
index 65fda09..3111f05 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
package org.apache.flink.test.checkpointing;
+import java.io.IOException;
+
public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- public FileBackendEventTimeWindowCheckpointingITCase() {
+ public FileBackendEventTimeWindowCheckpointingITCase() throws IOException {
super(StateBackendEnum.FILE);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..8e23909
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import java.io.IOException;
+
+/**
+ * Integration tests for incremental RocksDB backend.
+ */
+public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+ public HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException {
+ super(StateBackendEnum.ROCKSDB_INCREMENTAL_ZK);
+ }
+
+ @Override
+ protected int numElementsPerKey() {
+ return 3000;
+ }
+
+ @Override
+ protected int windowSize() {
+ return 1000;
+ }
+
+ @Override
+ protected int windowSlide() {
+ return 100;
+ }
+
+ @Override
+ protected int numKeys() {
+ return 100;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index 352f9f7..2cdfbe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
package org.apache.flink.test.checkpointing;
+import java.io.IOException;
+
public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
+ public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException {
super(StateBackendEnum.ROCKSDB_INCREMENTAL);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
index 899b8d6..701b746 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
package org.apache.flink.test.checkpointing;
+import java.io.IOException;
+
public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- public MemBackendEventTimeWindowCheckpointingITCase() {
+ public MemBackendEventTimeWindowCheckpointingITCase() throws IOException {
super(StateBackendEnum.MEM);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index da2bbc7..b7cbaa9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
package org.apache.flink.test.checkpointing;
+import java.io.IOException;
+
public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- public RocksDbBackendEventTimeWindowCheckpointingITCase() {
+ public RocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException {
super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
}
[2/2] flink git commit: [FLINK-7268] [checkpoints] Scope
SharedStateRegistry objects per (re)start
Posted by sr...@apache.org.
[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09caa9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09caa9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09caa9ff
Branch: refs/heads/release-1.3
Commit: 09caa9ffdc8168610c7d0260360c034ea87f904c
Parents: 0225db2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jul 25 12:04:16 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 28 15:42:28 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 13 +-
.../checkpoint/CheckpointCoordinator.java | 32 +-
.../runtime/checkpoint/CompletedCheckpoint.java | 3 +-
.../checkpoint/CompletedCheckpointStore.java | 5 +-
.../StandaloneCompletedCheckpointStore.java | 4 +-
.../ZooKeeperCompletedCheckpointStore.java | 12 +-
.../runtime/executiongraph/ExecutionGraph.java | 6 +-
.../state/IncrementalKeyedStateHandle.java | 68 ++-
.../runtime/state/KeyGroupsStateHandle.java | 2 +-
.../runtime/state/MultiStreamStateHandle.java | 10 +-
.../runtime/state/SharedStateRegistry.java | 52 ++-
.../state/SharedStateRegistryFactory.java | 35 ++
.../state/memory/ByteStreamStateHandle.java | 1 +
...tCoordinatorExternalizedCheckpointsTest.java | 22 +-
.../CheckpointCoordinatorFailureTest.java | 7 +-
.../CheckpointCoordinatorMasterHooksTest.java | 7 +-
.../checkpoint/CheckpointCoordinatorTest.java | 437 ++++++++++---------
.../checkpoint/CheckpointStateRestoreTest.java | 10 +-
...ZooKeeperCompletedCheckpointStoreITCase.java | 25 +-
.../ZooKeeperCompletedCheckpointStoreTest.java | 7 +-
.../state/IncrementalKeyedStateHandleTest.java | 75 +++-
.../RecoverableCompletedCheckpointStore.java | 33 +-
.../streaming/runtime/tasks/StreamTask.java | 1 -
...tractEventTimeWindowCheckpointingITCase.java | 85 +++-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 51 +++
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
31 files changed, 688 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a6b53ec..7e0910e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -231,7 +231,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
- LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
+
+ LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.",
+ this.operatorIdentifier,
+ this.backendUID);
}
/**
@@ -835,11 +838,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
void takeSnapshot() throws Exception {
assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
+ final long lastCompletedCheckpoint;
+
// use the last completed checkpoint as the comparison base.
synchronized (stateBackend.materializedSstFiles) {
- baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+ lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
+ baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
}
+ LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+ "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 82933ac..fe94d25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
@@ -174,8 +175,11 @@ public class CheckpointCoordinator {
@Nullable
private CheckpointStatsTracker statsTracker;
+ /** A factory for SharedStateRegistry objects */
+ private final SharedStateRegistryFactory sharedStateRegistryFactory;
+
/** Registry that tracks state which is shared across (incremental) checkpoints */
- private final SharedStateRegistry sharedStateRegistry;
+ private SharedStateRegistry sharedStateRegistry;
// --------------------------------------------------------------------------------------------
@@ -192,7 +196,8 @@ public class CheckpointCoordinator {
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
@Nullable String checkpointDirectory,
- Executor executor) {
+ Executor executor,
+ SharedStateRegistryFactory sharedStateRegistryFactory) {
// sanity checks
checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -230,7 +235,8 @@ public class CheckpointCoordinator {
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor);
- this.sharedStateRegistry = new SharedStateRegistry(executor);
+ this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
+ this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
@@ -1044,10 +1050,23 @@ public class CheckpointCoordinator {
throw new IllegalStateException("CheckpointCoordinator is shut down");
}
- // Recover the checkpoints
- completedCheckpointStore.recover(sharedStateRegistry);
+ // We create a new shared state registry object, so that all pending async disposal requests from previous
+ // runs will go against the old object (were they can do no harm).
+ // This must happen under the checkpoint lock.
+ sharedStateRegistry.close();
+ sharedStateRegistry = sharedStateRegistryFactory.create(executor);
+
+ // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
+ completedCheckpointStore.recover();
+
+ // Now, we re-register all (shared) states from the checkpoint store with the new registry
+ for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
+ completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+ }
+
+ LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
- // restore from the latest checkpoint
+ // Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
if (latest == null) {
@@ -1121,7 +1140,6 @@ public class CheckpointCoordinator {
CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
job, tasks, savepointPath, userClassLoader, allowNonRestored);
- savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpointStore.addCheckpoint(savepoint);
// Reset the checkpoint ID counter
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 56aa19d..76d1580 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable {
private void doDiscard() throws Exception {
+ LOG.trace("Executing discard procedure for {}.", this);
+
try {
// collect exceptions and continue cleanup
Exception exception = null;
@@ -225,7 +227,6 @@ public class CompletedCheckpoint implements Serializable {
// discard private state objects
try {
Collection<OperatorState> values = operatorStates.values();
- LOG.trace("About to discard operator states {}.", values);
StateUtil.bestEffortDiscardAllStateObjects(values);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 45d407e..82193b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
import java.util.List;
@@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
*
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* available checkpoint.
- *
- * @param sharedStateRegistry the shared state registry to register recovered states.
*/
- void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
+ void recover() throws Exception;
/**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index fbb0198..63e7468 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
}
@Override
- public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public void recover() throws Exception {
// Nothing to do
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c4cb6bc..88dd0d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -18,20 +18,21 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* that the history of checkpoints is consistent.
*/
@Override
- public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public void recover() throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper.");
// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
completedCheckpoints.clear();
- sharedStateRegistry.clear();
// Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
- // Re-register all shared states in the checkpoint.
- completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f9d2d69..c105d2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -66,6 +66,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
@@ -74,8 +75,8 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
-
import org.apache.flink.util.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -459,7 +460,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
checkpointIDCounter,
checkpointStore,
checkpointDir,
- ioExecutor);
+ ioExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY);
// register the master hooks on the checkpoint coordinator
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 0085890..0268b10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
private final UUID backendIdentifier;
/**
- * The key-group range covered by this state handle
+ * The key-group range covered by this state handle.
*/
private final KeyGroupRange keyGroupRange;
/**
- * The checkpoint Id
+ * The checkpoint Id.
*/
private final long checkpointId;
/**
- * Shared state in the incremental checkpoint. This i
+ * Shared state in the incremental checkpoint.
*/
private final Map<StateHandleID, StreamStateHandle> sharedState;
/**
- * Private state in the incremental checkpoint
+ * Private state in the incremental checkpoint.
*/
private final Map<StateHandleID, StreamStateHandle> privateState;
/**
- * Primary meta data state of the incremental checkpoint
+ * Primary meta data state of the incremental checkpoint.
*/
private final StreamStateHandle metaStateHandle;
@@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
@Override
public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
- if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
- return this;
- } else {
- return null;
- }
+ return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
+ null : this;
}
@Override
public void discardState() throws Exception {
+ SharedStateRegistry registry = this.sharedStateRegistry;
+ final boolean isRegistered = (registry != null);
+
+ LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
+ isRegistered,
+ checkpointId,
+ backendIdentifier);
+
try {
metaStateHandle.discardState();
} catch (Exception e) {
@@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
// If this was not registered, we can delete the shared state. We can simply apply this
// to all handles, because all handles that have not been created for the first time for this
// are only placeholders at this point (disposing them is a NOP).
- if (sharedStateRegistry == null) {
- try {
- StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard new sst file states.", e);
- }
- } else {
+ if (isRegistered) {
// If this was registered, we only unregister all our referenced shared states
// from the registry.
for (StateHandleID stateHandleID : sharedState.keySet()) {
- sharedStateRegistry.unregisterReference(
+ registry.unregisterReference(
createSharedStateRegistryKeyFromFileName(stateHandleID));
}
+ } else {
+ // Otherwise, we assume to own those handles and dispose them directly.
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard new sst file states.", e);
+ }
}
}
@@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
- Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+ // This is a quick check to avoid that we register twice with the same registry. However, the code allows to
+ // register again with a different registry. The implication is that ownership is transferred to this new
+ // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
+ // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
+ // an old registry object from a previous run is due to be GCed and will never be used for registration again.
+ Preconditions.checkState(
+ sharedStateRegistry != stateRegistry,
+ "The state handle has already registered its shared states to the given registry.");
sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
+ LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.",
+ checkpointId,
+ backendIdentifier);
+
for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
SharedStateRegistryKey registryKey =
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
@@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
result = 31 * result + getMetaStateHandle().hashCode();
return result;
}
+
+ @Override
+ public String toString() {
+ return "IncrementalKeyedStateHandle{" +
+ "backendIdentifier=" + backendIdentifier +
+ ", keyGroupRange=" + keyGroupRange +
+ ", checkpointId=" + checkpointId +
+ ", sharedState=" + sharedState +
+ ", privateState=" + privateState +
+ ", metaStateHandle=" + metaStateHandle +
+ ", registered=" + (sharedStateRegistry != null) +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8e38ad4..8092f6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
public String toString() {
return "KeyGroupsStateHandle{" +
"groupRangeOffsets=" + groupRangeOffsets +
- ", data=" + stateHandle +
+ ", stateHandle=" + stateHandle +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index b95dace..1960c1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements StreamStateHandle {
private final List<StreamStateHandle> stateHandles;
private final long stateSize;
- public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException {
+ public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
this.stateHandles = Preconditions.checkNotNull(stateHandles);
long calculateSize = 0L;
for(StreamStateHandle stateHandle : stateHandles) {
@@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements StreamStateHandle {
return stateSize;
}
+ @Override
+ public String toString() {
+ return "MultiStreamStateHandle{" +
+ "stateHandles=" + stateHandles +
+ ", stateSize=" + stateSize +
+ '}';
+ }
+
static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream {
private final TreeMap<Long, StreamStateHandle> stateHandleMap;
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index e0ca873..347f30c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -38,13 +38,24 @@ import java.util.concurrent.Executor;
* maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
* them.
*/
-public class SharedStateRegistry {
+public class SharedStateRegistry implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
+ /** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */
+ public static final SharedStateRegistryFactory DEFAULT_FACTORY = new SharedStateRegistryFactory() {
+ @Override
+ public SharedStateRegistry create(Executor deleteExecutor) {
+ return new SharedStateRegistry(deleteExecutor);
+ }
+ };
+
/** All registered state objects by an artificial key */
private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
+ /** This flag indicates whether or not the registry is open or if close() was called */
+ private boolean open;
+
/** Executor for async state deletion */
private final Executor asyncDisposalExecutor;
@@ -56,6 +67,7 @@ public class SharedStateRegistry {
public SharedStateRegistry(Executor asyncDisposalExecutor) {
this.registeredStates = new HashMap<>();
this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
+ this.open = true;
}
/**
@@ -82,6 +94,9 @@ public class SharedStateRegistry {
SharedStateRegistry.SharedStateEntry entry;
synchronized (registeredStates) {
+
+ Preconditions.checkState(open, "Attempt to register state to closed SharedStateRegistry.");
+
entry = registeredStates.get(registrationKey);
if (entry == null) {
@@ -96,6 +111,11 @@ public class SharedStateRegistry {
// delete if this is a real duplicate
if (!Objects.equals(state, entry.stateHandle)) {
scheduledStateDeletion = state;
+ LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to " +
+ "be an unnecessary copy of existing state {} and will be dropped.",
+ registrationKey,
+ state,
+ entry.stateHandle);
}
entry.increaseReferenceCount();
}
@@ -112,7 +132,8 @@ public class SharedStateRegistry {
*
* @param registrationKey the shared state for which we release a reference.
* @return the result of the request, consisting of the reference count after this operation
- * and the state handle, or null if the state handle was deleted through this request.
+ * and the state handle, or null if the state handle was deleted through this request. Returns null if the registry
+ * was previously closed.
*/
public Result unregisterReference(SharedStateRegistryKey registrationKey) {
@@ -123,6 +144,7 @@ public class SharedStateRegistry {
SharedStateRegistry.SharedStateEntry entry;
synchronized (registeredStates) {
+
entry = registeredStates.get(registrationKey);
Preconditions.checkState(entry != null,
@@ -164,10 +186,18 @@ public class SharedStateRegistry {
}
}
+ @Override
+ public String toString() {
+ synchronized (registeredStates) {
+ return "SharedStateRegistry{" +
+ "registeredStates=" + registeredStates +
+ '}';
+ }
+ }
+
private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
// We do the small optimization to not issue discards for placeholders, which are NOPs.
if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
-
LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
asyncDisposalExecutor.execute(
new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
@@ -178,6 +208,13 @@ public class SharedStateRegistry {
return stateHandle instanceof PlaceholderStreamStateHandle;
}
+ @Override
+ public void close() {
+ synchronized (registeredStates) {
+ open = false;
+ }
+ }
+
/**
* An entry in the registry, tracking the handle and the corresponding reference count.
*/
@@ -279,13 +316,4 @@ public class SharedStateRegistry {
}
}
}
-
- /**
- * Clears the registry.
- */
- public void clear() {
- synchronized (registeredStates) {
- registeredStates.clear();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
new file mode 100644
index 0000000..05c9825
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Simple factory to produce {@link SharedStateRegistry} objects.
+ */
+public interface SharedStateRegistryFactory {
+
+ /**
+ * Factory method for {@link SharedStateRegistry}.
+ *
+ * @param deleteExecutor executor used to run (async) deletes.
+ * @return a SharedStateRegistry object
+ */
+ SharedStateRegistry create(Executor deleteExecutor);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 9ba9d35..3a43d4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements StreamStateHandle {
public String toString() {
return "ByteStreamStateHandle{" +
"handleName='" + handleName + '\'' +
+ ", dataBytes=" + data.length +
'}';
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index d293eea..edc29fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -18,14 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.concurrent.Executors;
@@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
/**
* CheckpointCoordinator tests for externalized checkpoints.
*
@@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
checkpointDir.getAbsolutePath(),
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 344b340..5cca94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -78,7 +78,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new FailingCompletedCheckpointStore(),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.triggerCheckpoint(triggerTimestamp, false);
@@ -113,7 +114,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
when(subtaskState.getManagedKeyedState()).thenReturn(managedRawHandle);
AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
-
+
try {
coord.receiveAcknowledgeMessage(acknowledgeMessage);
fail("Expected a checkpoint exception because the completed checkpoint store could not " +
@@ -136,7 +137,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
@Override
- public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public void recover() throws Exception {
throw new UnsupportedOperationException("Not implemented.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index d6daa4e..94063a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -47,14 +47,12 @@ import java.util.List;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.any;
@@ -405,7 +403,8 @@ public class CheckpointCoordinatorMasterHooksTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
}
private static <T> T mockGeneric(Class<?> clazz) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..16a89ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -56,6 +54,9 @@ import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -140,7 +141,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -200,7 +202,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -251,7 +254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -303,7 +307,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -407,7 +412,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -526,7 +532,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -698,7 +705,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -828,7 +836,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -992,7 +1001,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger a checkpoint, partially acknowledged
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1019,8 +1029,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(250);
}
while (!checkpoint.isDiscarded() &&
- coord.getNumberOfPendingCheckpoints() > 0 &&
- System.currentTimeMillis() < deadline);
+ coord.getNumberOfPendingCheckpoints() > 0 &&
+ System.currentTimeMillis() < deadline);
assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -1071,7 +1081,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1134,7 +1145,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1274,7 +1286,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1296,7 +1309,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
int numCallsSoFar = numCalls.get();
Thread.sleep(400);
assertTrue(numCallsSoFar == numCalls.get() ||
- numCallsSoFar+1 == numCalls.get());
+ numCallsSoFar+1 == numCalls.get());
// start another sequence of periodic scheduling
numCalls.set(0);
@@ -1318,7 +1331,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
numCallsSoFar = numCalls.get();
Thread.sleep(400);
assertTrue(numCallsSoFar == numCalls.get() ||
- numCallsSoFar + 1 == numCalls.get());
+ numCallsSoFar + 1 == numCalls.get());
coord.shutdown(JobStatus.FINISHED);
}
@@ -1354,19 +1367,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
final long delay = 50;
final CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- 2, // periodic interval is 2 ms
- 200_000, // timeout is very long (200 s)
- delay, // 50 ms delay between checkpoints
- 1,
- ExternalizedCheckpointSettings.none(),
- new ExecutionVertex[] { vertex },
- new ExecutionVertex[] { vertex },
- new ExecutionVertex[] { vertex },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(2),
- "dummy-path",
- Executors.directExecutor());
+ jid,
+ 2, // periodic interval is 2 ms
+ 200_000, // timeout is very long (200 s)
+ delay, // 50 ms delay between checkpoints
+ 1,
+ ExternalizedCheckpointSettings.none(),
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2),
+ "dummy-path",
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
try {
coord.startCheckpointScheduler();
@@ -1439,7 +1453,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1596,7 +1611,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
counter,
new StandaloneCompletedCheckpointStore(10),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -1702,7 +1718,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1715,12 +1732,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while ((now = System.currentTimeMillis()) < minDuration ||
- (numCalls.get() < maxConcurrentAttempts && now < timeout));
+ (numCalls.get() < maxConcurrentAttempts && now < timeout));
assertEquals(maxConcurrentAttempts, numCalls.get());
verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
- .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
+ .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
// now, once we acknowledge one checkpoint, it should trigger the next one
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
@@ -1775,7 +1792,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1788,7 +1806,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while ((now = System.currentTimeMillis()) < minDuration ||
- (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
+ (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
// validate that the pending checkpoints are there
assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1806,7 +1824,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while (coord.getPendingCheckpoints().get(4L) == null &&
- System.currentTimeMillis() < newTimeout);
+ System.currentTimeMillis() < newTimeout);
// do the final check
assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1837,12 +1855,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED);
when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
- new Answer<ExecutionState>() {
- @Override
- public ExecutionState answer(InvocationOnMock invocation){
- return currentState.get();
- }
- });
+ new Answer<ExecutionState>() {
+ @Override
+ public ExecutionState answer(InvocationOnMock invocation){
+ return currentState.get();
+ }
+ });
CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
@@ -1857,7 +1875,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1874,7 +1893,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while (System.currentTimeMillis() < timeout &&
- coord.getNumberOfPendingCheckpoints() == 0);
+ coord.getNumberOfPendingCheckpoints() == 0);
assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
}
@@ -1909,7 +1928,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointIDCounter,
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
@@ -1962,7 +1982,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -2006,7 +2027,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
ExecutionVertex[] arrayExecutionVertices =
- allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore();
@@ -2024,7 +2045,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
store,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2051,11 +2073,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
SubtaskState subtaskState = mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index));
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- subtaskState);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ subtaskState);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2064,11 +2086,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
SubtaskState subtaskState = mockSubtaskState(jobVertexID2, index, keyGroupPartitions2.get(index));
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- subtaskState);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ subtaskState);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2150,7 +2172,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2167,11 +2190,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2182,11 +2205,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2251,7 +2274,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
ExecutionVertex[] arrayExecutionVertices =
- allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2267,7 +2290,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2277,22 +2301,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
List<KeyGroupRange> keyGroupPartitions2 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
- jobVertexID1, keyGroupPartitions1.get(index), false);
+ jobVertexID1, keyGroupPartitions1.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2302,15 +2326,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID2, index);
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
- jobVertexID2, keyGroupPartitions2.get(index), false);
+ jobVertexID2, keyGroupPartitions2.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2390,13 +2414,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
int newParallelism2 = scaleOut ? 13 : 2;
final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
- jobVertexID1,
- parallelism1,
- maxParallelism1);
+ jobVertexID1,
+ parallelism1,
+ maxParallelism1);
final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(
- jobVertexID2,
- parallelism2,
- maxParallelism2);
+ jobVertexID2,
+ parallelism2,
+ maxParallelism2);
List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1 + parallelism2);
@@ -2404,7 +2428,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
ExecutionVertex[] arrayExecutionVertices =
- allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2420,7 +2444,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2430,9 +2455,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
List<KeyGroupRange> keyGroupPartitions2 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
//vertex 1
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
@@ -2443,11 +2468,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2463,14 +2488,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
expectedOpStatesBackend.add(opStateBackend);
expectedOpStatesRaw.add(opStateRaw);
SubtaskState checkpointStateHandles =
- new SubtaskState(new ChainedStateHandle<>(
- Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
+ new SubtaskState(new ChainedStateHandle<>(
+ Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2482,18 +2507,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
List<KeyGroupRange> newKeyGroupPartitions2 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
- jobVertexID1,
- parallelism1,
- maxParallelism1);
+ jobVertexID1,
+ parallelism1,
+ maxParallelism1);
// rescale vertex 2
final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
- jobVertexID2,
- newParallelism2,
- maxParallelism2);
+ jobVertexID2,
+ newParallelism2,
+ maxParallelism2);
tasks.put(jobVertexID1, newJobVertex1);
tasks.put(jobVertexID2, newJobVertex2);
@@ -2534,7 +2559,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
return new Tuple2<>(jobVertexID, operatorID);
}
-
+
/**
* old topology
* [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2
@@ -2575,7 +2600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1);
operatorStates.put(id.f1, taskState);
for (int index = 0; index < taskState.getParallelism(); index++) {
- StreamStateHandle subNonPartitionedState =
+ StreamStateHandle subNonPartitionedState =
generateStateForVertex(id.f0, index)
.get(0);
OperatorStateHandle subManagedOperatorState =
@@ -2673,15 +2698,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
spy(new StandaloneCompletedCheckpointStore(1));
CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(
- jobID,
- 2,
- System.currentTimeMillis(),
- System.currentTimeMillis() + 3000,
- operatorStates,
- Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ jobID,
+ 2,
+ System.currentTimeMillis(),
+ System.currentTimeMillis() + 3000,
+ operatorStates,
+ Collections.<MasterState>emptyList(),
+ CheckpointProperties.forStandardCheckpoint(),
+ null,
+ null);
when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
@@ -2699,7 +2724,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
standaloneCompletedCheckpointStore,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.restoreLatestCheckpointedState(tasks, false, true);
@@ -2832,7 +2858,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
"fake-directory",
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -2862,14 +2889,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
List<Collection<OperatorStateHandle>> repartitionedStates =
- repartitioner.repartitionState(Collections.singletonList(osh), 3);
+ repartitioner.repartitionState(Collections.singletonList(osh), 3);
Map<String, Integer> checkCounts = new HashMap<>(3);
for (Collection<OperatorStateHandle> operatorStateHandles : repartitionedStates) {
for (OperatorStateHandle operatorStateHandle : operatorStateHandles) {
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> stateNameToMetaInfo :
- operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
+ operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
String stateName = stateNameToMetaInfo.getKey();
Integer count = checkCounts.get(stateName);
@@ -2900,8 +2927,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
// ------------------------------------------------------------------------
public static KeyGroupsStateHandle generateKeyGroupState(
- JobVertexID jobVertexID,
- KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
+ JobVertexID jobVertexID,
+ KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
List<Integer> testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
@@ -2918,27 +2945,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static KeyGroupsStateHandle generateKeyGroupState(
- KeyGroupRange keyGroupRange,
- List<? extends Serializable> states) throws IOException {
+ KeyGroupRange keyGroupRange,
+ List<? extends Serializable> states) throws IOException {
Preconditions.checkArgument(keyGroupRange.getNumberOfKeyGroups() == states.size());
Tuple2<byte[], List<long[]>> serializedDataWithOffsets =
- serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
+ serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0));
ByteStreamStateHandle allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare(
- String.valueOf(UUID.randomUUID()),
- serializedDataWithOffsets.f0);
+ String.valueOf(UUID.randomUUID()),
+ serializedDataWithOffsets.f0);
KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(
- keyGroupRangeOffsets,
- allSerializedStatesHandle);
+ keyGroupRangeOffsets,
+ allSerializedStatesHandle);
return keyGroupsStateHandle;
}
public static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(
- List<List<? extends Serializable>> serializables) throws IOException {
+ List<List<? extends Serializable>> serializables) throws IOException {
List<long[]> offsets = new ArrayList<>(serializables.size());
List<byte[]> serializedGroupValues = new ArrayList<>();
@@ -2962,19 +2989,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
runningGroupsOffset = 0;
for (byte[] serializedGroupValue : serializedGroupValues) {
System.arraycopy(
- serializedGroupValue,
- 0,
- allSerializedValuesConcatenated,
- runningGroupsOffset,
- serializedGroupValue.length);
+ serializedGroupValue,
+ 0,
+ allSerializedValuesConcatenated,
+ runningGroupsOffset,
+ serializedGroupValue.length);
runningGroupsOffset += serializedGroupValue.length;
}
return new Tuple2<>(allSerializedValuesConcatenated, offsets);
}
public static ChainedStateHandle<StreamStateHandle> generateStateForVertex(
- JobVertexID jobVertexID,
- int index) throws IOException {
+ JobVertexID jobVertexID,
+ int index) throws IOException {
Random random = new Random(jobVertexID.hashCode() + index);
int value = random.nextInt();
@@ -2982,17 +3009,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static ChainedStateHandle<StreamStateHandle> generateChainedStateHandle(
- Serializable value) throws IOException {
+ Serializable value) throws IOException {
return ChainedStateHandle.wrapSingleHandle(
- TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
+ TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
}
public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
- JobVertexID jobVertexID,
- int index,
- int namedStates,
- int partitionsPerState,
- boolean rawState) throws IOException {
+ JobVertexID jobVertexID,
+ int index,
+ int namedStates,
+ int partitionsPerState,
+ boolean rawState) throws IOException {
Map<String, List<? extends Serializable>> statesListsMap = new HashMap<>(namedStates);
@@ -3015,7 +3042,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
private static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
- Map<String, List<? extends Serializable>> states) throws IOException {
+ Map<String, List<? extends Serializable>> states) throws IOException {
List<List<? extends Serializable>> namedStateSerializables = new ArrayList<>(states.size());
@@ -3030,26 +3057,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
int idx = 0;
for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
offsetsMap.put(
- entry.getKey(),
- new OperatorStateHandle.StateMetaInfo(
- serializationWithOffsets.f1.get(idx),
- OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+ entry.getKey(),
+ new OperatorStateHandle.StateMetaInfo(
+ serializationWithOffsets.f1.get(idx),
+ OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
++idx;
}
ByteStreamStateHandle streamStateHandle = new TestByteStreamStateHandleDeepCompare(
- String.valueOf(UUID.randomUUID()),
- serializationWithOffsets.f0);
+ String.valueOf(UUID.randomUUID()),
+ serializationWithOffsets.f0);
OperatorStateHandle operatorStateHandle =
- new OperatorStateHandle(offsetsMap, streamStateHandle);
+ new OperatorStateHandle(offsetsMap, streamStateHandle);
return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
}
static ExecutionJobVertex mockExecutionJobVertex(
- JobVertexID jobVertexID,
- int parallelism,
- int maxParallelism) {
+ JobVertexID jobVertexID,
+ int parallelism,
+ int maxParallelism) {
return mockExecutionJobVertex(
jobVertexID,
@@ -3131,7 +3158,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
-
+
when(vertex.getJobVertex()).thenReturn(jobVertex);
return vertex;
@@ -3158,8 +3185,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static void verifyStateRestore(
- JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
- List<KeyGroupRange> keyGroupPartitions) throws Exception {
+ JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
+ List<KeyGroupRange> keyGroupPartitions) throws Exception {
for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
@@ -3168,28 +3195,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
ChainedStateHandle<StreamStateHandle> expectNonPartitionedState = generateStateForVertex(jobVertexID, i);
ChainedStateHandle<StreamStateHandle> actualNonPartitionedState = taskStateHandles.getLegacyOperatorState();
assertTrue(CommonTestUtils.isSteamContentEqual(
- expectNonPartitionedState.get(0).openInputStream(),
- actualNonPartitionedState.get(0).openInputStream()));
+ expectNonPartitionedState.get(0).openInputStream(),
+ actualNonPartitionedState.get(0).openInputStream()));
ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend =
- generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
+ generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
List<Collection<OperatorStateHandle>> actualPartitionableState = taskStateHandles.getManagedOperatorState();
assertTrue(CommonTestUtils.isSteamContentEqual(
- expectedOpStateBackend.get(0).openInputStream(),
- actualPartitionableState.get(0).iterator().next().openInputStream()));
+ expectedOpStateBackend.get(0).openInputStream(),
+ actualPartitionableState.get(0).iterator().next().openInputStream()));
KeyGroupsStateHandle expectPartitionedKeyGroupState = generateKeyGroupState(
- jobVertexID, keyGroupPartitions.get(i), false);
+ jobVertexID, keyGroupPartitions.get(i), false);
Collection<KeyedStateHandle> actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState();
compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), actualPartitionedKeyGroupState);
}
}
public static void compareKeyedState(
- Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
- Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
+ Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
+ Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next();
int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
@@ -3207,7 +3234,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
inputStream.seek(offset);
int expectedKeyGroupState =
- InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
+ InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
for (KeyedStateHandle oneActualKeyedStateHandle : actualPartitionedKeyGroupState) {
assertTrue(oneActualKeyedStateHandle instanceof KeyGroupsStateHandle);
@@ -3218,7 +3245,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) {
actualInputStream.seek(actualOffset);
int actualGroupState = InstantiationUtil.
- deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
+ deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
assertEquals(expectedKeyGroupState, actualGroupState);
}
}
@@ -3228,8 +3255,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static void comparePartitionableState(
- List<ChainedStateHandle<OperatorStateHandle>> expected,
- List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
+ List<ChainedStateHandle<OperatorStateHandle>> expected,
+ List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
List<String> expectedResult = new ArrayList<>();
for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) {
@@ -3263,7 +3290,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
for (long offset : entry.getValue().getOffsets()) {
in.seek(offset);
Integer state = InstantiationUtil.
- deserializeObject(in, Thread.currentThread().getContextClassLoader());
+ deserializeObject(in, Thread.currentThread().getContextClassLoader());
resultCollector.add(opIdx + " : " + entry.getKey() + " : " + state);
}
}
@@ -3308,24 +3335,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// Periodic
CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
- System.currentTimeMillis(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- true);
+ System.currentTimeMillis(),
+ CheckpointProperties.forStandardCheckpoint(),
+ null,
+ true);
assertTrue(triggerResult.isFailure());
assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason());
// Not periodic
triggerResult = coord.triggerCheckpoint(
- System.currentTimeMillis(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- false);
+ System.currentTimeMillis(),
+ CheckpointProperties.forStandardCheckpoint(),
+ null,
+ false);
assertFalse(triggerResult.isFailure());
}
@@ -3352,12 +3380,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
int maxPartitionsPerState = 1 + r.nextInt(9);
doTestPartitionableStateRepartitioning(
- r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
+ r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
}
}
private void doTestPartitionableStateRepartitioning(
- Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
+ Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
List<OperatorStateHandle> previousParallelOpInstanceStates = new ArrayList<>(oldParallelism);
@@ -3374,15 +3402,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ?
- OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+ OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
namedStatesToOffsets.put(
- "State-" + s,
- new OperatorStateHandle.StateMetaInfo(offs, mode));
+ "State-" + s,
+ new OperatorStateHandle.StateMetaInfo(offs, mode));
}
previousParallelOpInstanceStates.add(
- new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
+ new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
}
Map<StreamStateHandle, Map<String, List<Long>>> expected = new HashMap<>();
@@ -3395,7 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
long[] offs = e.getValue().getOffsets();
int replication = e.getValue().getDistributionMode().equals(OperatorStateHandle.Mode.BROADCAST) ?
- newParallelism : 1;
+ newParallelism : 1;
expectedTotalPartitions += replication * offs.length;
List<Long> offsList = new ArrayList<>(offs.length);
@@ -3413,7 +3441,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
List<Collection<OperatorStateHandle>> pshs =
- repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
+ repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
Map<StreamStateHandle, Map<String, List<Long>>> actual = new HashMap<>();
@@ -3486,7 +3514,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
coord.setCheckpointStatsTracker(tracker);
@@ -3524,7 +3553,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
store,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
store.addCheckpoint(new CompletedCheckpoint(
new JobID(),
@@ -3580,7 +3610,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointIDCounter,
completedCheckpointStore,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger a first checkpoint
assertTrue(
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7d24568..0888cff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.util.SerializableObject;
@@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
@@ -183,7 +185,8 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
try {
coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -240,7 +243,8 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
StreamStateHandle serializedState = CheckpointCoordinatorTest
.generateChainedStateHandle(new SerializableObject())