You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:35 UTC
[17/27] flink git commit: [FLINK-4381] Refactor State to Prepare For
Key-Group State Backends
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index dea3452..d62b13e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.InstantiationUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -40,9 +40,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* State handles backed by ZooKeeper.
*
- * <p>Added state is persisted via {@link StateHandle}s, which in turn are written to
- * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper
- * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs.
+ * <p>Added state is persisted via {@link RetrievableStateHandle RetrievableStateHandles},
+ * which in turn are written to ZooKeeper. This level of indirection is necessary to keep the
+ * amount of data in ZooKeeper small. ZooKeeper is build for data in the KB range whereas
+ * state can grow to multiple MBs.
*
* <p>State modifications require some care, because it is possible that certain failures bring
* the state handle backend and ZooKeeper out of sync.
@@ -72,7 +73,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
/** Curator ZooKeeper client */
private final CuratorFramework client;
- private final StateStorageHelper<T> storage;
+ private final RetrievableStateStorageHelper<T> storage;
/**
* Creates a {@link ZooKeeperStateHandleStore}.
@@ -84,7 +85,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
*/
public ZooKeeperStateHandleStore(
CuratorFramework client,
- StateStorageHelper storage) throws IOException {
+ RetrievableStateStorageHelper<T> storage) throws IOException {
this.client = checkNotNull(client, "Curator client");
this.storage = checkNotNull(storage, "State storage");
@@ -94,9 +95,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* Creates a state handle and stores it in ZooKeeper with create mode {@link
* CreateMode#PERSISTENT}.
*
- * @see #add(String, Serializable, CreateMode)
+ * @see #add(String, T, CreateMode)
*/
- public StateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
+ public RetrievableStateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
}
@@ -111,39 +112,39 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* start with a '/')
* @param state State to be added
* @param createMode The create mode for the new path in ZooKeeper
- * @return Created {@link StateHandle}
+ *
+ * @return The Created {@link RetrievableStateHandle}.
* @throws Exception If a ZooKeeper or state handle operation fails
*/
- public StateHandle<T> add(
+ public RetrievableStateHandle<T> add(
String pathInZooKeeper,
T state,
CreateMode createMode) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
- StateHandle<T> stateHandle = storage.store(state);
+ RetrievableStateHandle<T> storeHandle = storage.store(state);
boolean success = false;
try {
// Serialize the state handle. This writes the state to the backend.
- byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+ byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle);
// Write state handle (not the actual state) to ZooKeeper. This is expected to be
// smaller than the state itself. This level of indirection makes sure that data in
// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
// the state can be larger.
- client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStateHandle);
+ client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStoreHandle);
success = true;
-
- return stateHandle;
+ return storeHandle;
}
finally {
if (!success) {
// Cleanup the state handle if it was not written to ZooKeeper.
- if (stateHandle != null) {
- stateHandle.discardState();
+ if (storeHandle != null) {
+ storeHandle.discardState();
}
}
}
@@ -161,31 +162,29 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
- StateHandle<T> oldStateHandle = get(pathInZooKeeper);
+ RetrievableStateHandle<T> oldStateHandle = get(pathInZooKeeper);
- StateHandle<T> stateHandle = storage.store(state);
+ RetrievableStateHandle<T> newStateHandle = storage.store(state);
boolean success = false;
try {
// Serialize the new state handle. This writes the state to the backend.
- byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+ byte[] serializedStateHandle = InstantiationUtil.serializeObject(newStateHandle);
// Replace state handle in ZooKeeper.
client.setData()
.withVersion(expectedVersion)
.forPath(pathInZooKeeper, serializedStateHandle);
-
success = true;
- }
- finally {
- if (success) {
+ } finally {
+ if(success) {
oldStateHandle.discardState();
- }
- else {
- stateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
}
}
+
}
/**
@@ -216,13 +215,11 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@SuppressWarnings("unchecked")
- public StateHandle<T> get(String pathInZooKeeper) throws Exception {
+ public RetrievableStateHandle<T> get(String pathInZooKeeper) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
byte[] data = client.getData().forPath(pathInZooKeeper);
-
- return (StateHandle<T>) InstantiationUtil
- .deserializeObject(data, ClassLoader.getSystemClassLoader());
+ return InstantiationUtil.deserializeObject(data);
}
/**
@@ -234,8 +231,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@SuppressWarnings("unchecked")
- public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception {
- final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws Exception {
+ final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
boolean success = false;
@@ -254,7 +251,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
path = "/" + path;
try {
- final StateHandle<T> stateHandle = get(path);
+ final RetrievableStateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
} catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
@@ -272,6 +269,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
return stateHandles;
}
+
/**
* Gets all available state handles from ZooKeeper sorted by name (ascending).
*
@@ -281,8 +279,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@SuppressWarnings("unchecked")
- public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws Exception {
- final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByName() throws Exception {
+ final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
boolean success = false;
@@ -303,7 +301,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
path = "/" + path;
try {
- final StateHandle<T> stateHandle = get(path);
+ final RetrievableStateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
} catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
@@ -364,7 +362,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
public void removeAndDiscardState(String pathInZooKeeper) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
- StateHandle<T> stateHandle = get(pathInZooKeeper);
+ RetrievableStateHandle<T> stateHandle = get(pathInZooKeeper);
// Delete the state handle from ZooKeeper first
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
@@ -381,7 +379,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* @throws Exception If a ZooKeeper or state handle operation fails
*/
public void removeAndDiscardAllState() throws Exception {
- final List<Tuple2<StateHandle<T>, String>> allStateHandles = getAll();
+ final List<Tuple2<RetrievableStateHandle<T>, String>> allStateHandles = getAll();
ZKPaths.deleteChildren(
client.getZookeeperClient().getZooKeeper(),
@@ -389,7 +387,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
false);
// Discard the state handles only after they have been successfully deleted from ZooKeeper.
- for (Tuple2<StateHandle<T>, String> stateHandleAndPath : allStateHandles) {
+ for (Tuple2<RetrievableStateHandle<T>, String> stateHandleAndPath : allStateHandles) {
stateHandleAndPath.f0.discardState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
index 6692ef0..a534b40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -21,22 +21,22 @@ package org.apache.flink.runtime.zookeeper.filesystem;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
- * {@link StateStorageHelper} implementation which stores the state in the given filesystem path.
+ * {@link RetrievableStateStorageHelper} implementation which stores the state in the given filesystem path.
*
- * @param <T>
+ * @param <T> The type of the data that can be stored by this storage helper.
*/
-public class FileSystemStateStorageHelper<T extends Serializable> implements StateStorageHelper<T> {
+public class FileSystemStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
private final Path rootPath;
@@ -56,7 +56,7 @@ public class FileSystemStateStorageHelper<T extends Serializable> implements Sta
}
@Override
- public StateHandle<T> store(T state) throws Exception {
+ public RetrievableStateHandle<T> store(T state) throws Exception {
Exception latestException = null;
for (int attempt = 0; attempt < 10; attempt++) {
@@ -73,8 +73,7 @@ public class FileSystemStateStorageHelper<T extends Serializable> implements Sta
try(ObjectOutputStream os = new ObjectOutputStream(outStream)) {
os.writeObject(state);
}
-
- return new FileSerializableStateHandle<>(filePath);
+ return new RetrievableStreamStateHandle<T>(filePath);
}
throw new Exception("Could not open output stream for state backend", latestException);
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 356f1a9..407fa01 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -738,29 +738,18 @@ class JobManager(
sender() ! TriggerSavepointFailure(jobId, new IllegalArgumentException("Unknown job."))
}
- case DisposeSavepoint(savepointPath, blobKeys) =>
+ case DisposeSavepoint(savepointPath) =>
val senderRef = sender()
future {
try {
log.info(s"Disposing savepoint at '$savepointPath'.")
- if (blobKeys.isDefined) {
- // We don't need a real ID here for the library cache manager
- val jid = new JobID()
+ val savepoint = savepointStore.loadSavepoint(savepointPath)
- try {
- libraryCacheManager.registerJob(jid, blobKeys.get, java.util.Collections.emptyList())
- val classLoader = libraryCacheManager.getClassLoader(jid)
+ log.debug(s"$savepoint")
- // Discard with user code loader
- savepointStore.disposeSavepoint(savepointPath, classLoader)
- } finally {
- libraryCacheManager.unregisterJob(jid)
- }
- } else {
- // Discard with system class loader
- savepointStore.disposeSavepoint(savepointPath, getClass.getClassLoader)
- }
+ // Dispose the savepoint
+ savepointStore.disposeSavepoint(savepointPath)
senderRef ! DisposeSavepointSuccess
} catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 40c4dcf..5e2b547 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -490,13 +490,9 @@ object JobManagerMessages {
* Disposes a savepoint.
*
* @param savepointPath The path of the savepoint to dispose.
- * @param blobKeys BLOB keys if a user program JAR was uploaded for disposal.
- * This is required when we dispose state which contains
- * custom state instances (e.g. reducing state, rocksDB state).
*/
case class DisposeSavepoint(
- savepointPath: String,
- blobKeys: Option[java.util.List[BlobKey]] = None)
+ savepointPath: String)
extends RequiresLeaderSessionID
/** Response after a successful savepoint dispose. */
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 1816fc9..5416292 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -28,14 +28,19 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -55,8 +60,11 @@ public class CheckpointStateRestoreTest {
@Test
public void testSetState() {
try {
- final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
- new LocalStateHandle<SerializableObject>(new SerializableObject()));
+
+ final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject());
+ KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
+ List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
+ final List<KeyGroupsStateHandle> serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
final JobID jid = new JobID();
final JobVertexID statefulId = new JobVertexID();
@@ -106,9 +114,9 @@ public class CheckpointStateRestoreTest {
PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
final long checkpointId = pending.getCheckpointId();
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState, 0));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
@@ -119,11 +127,11 @@ public class CheckpointStateRestoreTest {
coord.restoreLatestCheckpointedState(map, true, false);
// verify that each stateful vertex got the state
- verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
- verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
- verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
- verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
- verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+ verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any());
+ verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any());
+ verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any());
+ verify(statelessExec1, times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), Mockito.<List<KeyGroupsStateHandle>>any());
+ verify(statelessExec2, times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), Mockito.<List<KeyGroupsStateHandle>>any());
}
catch (Exception e) {
e.printStackTrace();
@@ -134,8 +142,10 @@ public class CheckpointStateRestoreTest {
@Test
public void testStateOnlyPartiallyAvailable() {
try {
- final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
- new LocalStateHandle<SerializableObject>(new SerializableObject()));
+ final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject());
+ KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
+ List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
+ final List<KeyGroupsStateHandle> serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
final JobID jid = new JobID();
final JobVertexID statefulId = new JobVertexID();
@@ -186,9 +196,9 @@ public class CheckpointStateRestoreTest {
final long checkpointId = pending.getCheckpointId();
// the difference to the test "testSetState" is that one stateful subtask does not report state
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 634e177..6182ffd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -107,8 +108,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
// The ZooKeeper implementation discards asynchronously
expected[i - 1].awaitDiscard();
assertTrue(expected[i - 1].isDiscarded());
- assertEquals(userClassLoader, expected[i - 1].getDiscardClassLoader());
-
assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
}
}
@@ -183,7 +182,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
// The ZooKeeper implementation discards asynchronously
checkpoint.awaitDiscard();
assertTrue(checkpoint.isDiscarded());
- assertEquals(userClassLoader, checkpoint.getDiscardClassLoader());
}
}
@@ -199,14 +197,14 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
JobVertexID jvid = new JobVertexID();
Map<JobVertexID, TaskState> taskGroupStates = new HashMap<>();
- TaskState taskState = new TaskState(jvid, numberOfStates);
+ TaskState taskState = new TaskState(jvid, numberOfStates, numberOfStates);
taskGroupStates.put(jvid, taskState);
for (int i = 0; i < numberOfStates; i++) {
- SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
+ ChainedStateHandle<StreamStateHandle> stateHandle = CheckpointCoordinatorTest.generateChainedStateHandle(
new CheckpointMessagesTest.MyHandle());
- taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
+ taskState.putState(i, new SubtaskState(stateHandle, 0));
}
return new TestCompletedCheckpoint(new JobID(), id, 0, taskGroupStates);
@@ -230,8 +228,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
// Latch for test variants which discard asynchronously
private transient final CountDownLatch discardLatch = new CountDownLatch(1);
- private transient ClassLoader discardClassLoader;
-
public TestCompletedCheckpoint(
JobID jobId,
long checkpointId,
@@ -242,11 +238,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
}
@Override
- public void discard(ClassLoader userClassLoader) throws Exception {
- super.discard(userClassLoader);
+ public void discardState() throws Exception {
+ super.discardState();
if (!isDiscarded) {
- this.discardClassLoader = userClassLoader;
this.isDiscarded = true;
if (discardLatch != null) {
@@ -265,10 +260,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
}
}
- public ClassLoader getDiscardClassLoader() {
- return discardClassLoader;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 90a6836..9b04244 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -45,14 +45,14 @@ public class CompletedCheckpointTest {
// Verify discard call is forwarded to state
CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, true);
- checkpoint.discard(ClassLoader.getSystemClassLoader());
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ checkpoint.discardState();
+ verify(state, times(1)).discardState();
Mockito.reset(state);
// Verify discard call is not forwarded to state
checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, false);
- checkpoint.discard(ClassLoader.getSystemClassLoader());
- verify(state, times(0)).discard(Matchers.any(ClassLoader.class));
+ checkpoint.discardState();
+ verify(state, times(0)).discardState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index d235e61..fd4e02d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -66,7 +66,7 @@ public class PendingCheckpointTest {
setTaskState(pending, state);
pending.abortDeclined();
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ verify(state, times(1)).discardState();
// Abort error
Mockito.reset(state);
@@ -75,7 +75,7 @@ public class PendingCheckpointTest {
setTaskState(pending, state);
pending.abortError(new Exception("Expected Test Exception"));
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ verify(state, times(1)).discardState();
// Abort expired
Mockito.reset(state);
@@ -84,7 +84,7 @@ public class PendingCheckpointTest {
setTaskState(pending, state);
pending.abortExpired();
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ verify(state, times(1)).discardState();
// Abort subsumed
Mockito.reset(state);
@@ -93,7 +93,7 @@ public class PendingCheckpointTest {
setTaskState(pending, state);
pending.abortSubsumed();
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ verify(state, times(1)).discardState();
}
/**
@@ -106,21 +106,20 @@ public class PendingCheckpointTest {
PendingCheckpoint pending = createPendingCheckpoint();
PendingCheckpointTest.setTaskState(pending, state);
- pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+ pending.acknowledgeTask(ATTEMPT_ID, null, null);
CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
// Does discard state
- checkpoint.discard(ClassLoader.getSystemClassLoader());
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ checkpoint.discardState();
+ verify(state, times(1)).discardState();
}
// ------------------------------------------------------------------------
private static PendingCheckpoint createPendingCheckpoint() {
- ClassLoader classLoader = ClassLoader.getSystemClassLoader();
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
- return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, classLoader);
+ return new PendingCheckpoint(new JobID(), 0, 1, ackTasks);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
index 6ae6e1c..7258545 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
@@ -71,7 +71,7 @@ public class PendingSavepointTest {
PendingCheckpointTest.setTaskState(pending, state);
pending.abortDeclined();
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ verify(state, times(1)).discardState();
// Abort error
Mockito.reset(state);
@@ -81,7 +81,7 @@ public class PendingSavepointTest {
Future<String> future = pending.getCompletionFuture();
pending.abortError(new Exception("Expected Test Exception"));
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ verify(state, times(1)).discardState();
assertTrue(future.failed().isCompleted());
// Abort expired
@@ -92,7 +92,7 @@ public class PendingSavepointTest {
future = pending.getCompletionFuture();
pending.abortExpired();
- verify(state, times(1)).discard(Matchers.any(ClassLoader.class));
+ verify(state, times(1)).discardState();
assertTrue(future.failed().isCompleted());
// Abort subsumed
@@ -117,13 +117,13 @@ public class PendingSavepointTest {
Future<String> future = pending.getCompletionFuture();
- pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+ pending.acknowledgeTask(ATTEMPT_ID, null, null);
CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
// Does _NOT_ discard state
- checkpoint.discard(ClassLoader.getSystemClassLoader());
- verify(state, times(0)).discard(Matchers.any(ClassLoader.class));
+ checkpoint.discardState();
+ verify(state, times(0)).discardState();
// Future is completed
String path = Await.result(future, Duration.Zero());
@@ -133,9 +133,8 @@ public class PendingSavepointTest {
// ------------------------------------------------------------------------
private static PendingSavepoint createPendingSavepoint() {
- ClassLoader classLoader = ClassLoader.getSystemClassLoader();
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
- return new PendingSavepoint(new JobID(), 0, 1, ackTasks, classLoader, new HeapSavepointStore());
+ return new PendingSavepoint(new JobID(), 0, 1, ackTasks, new HeapSavepointStore());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 380ba2c..f273797 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.junit.AfterClass;
import org.junit.Before;
@@ -29,6 +28,8 @@ import org.junit.Test;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -61,10 +62,10 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception {
return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
- ZooKeeper.createClient(), CheckpointsPath, new StateStorageHelper<CompletedCheckpoint>() {
+ ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper<CompletedCheckpoint>() {
@Override
- public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
- return new LocalStateHandle<>(state);
+ public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
+ return new HeapRetrievableStateHandle<CompletedCheckpoint>(state);
}
});
}
@@ -160,4 +161,35 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
CompletedCheckpoint recovered = store.getLatestCheckpoint();
assertEquals(checkpoint, recovered);
}
+
+ static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
+
+ private static final long serialVersionUID = -268548467968932L;
+
+ public HeapRetrievableStateHandle(T state) {
+ this.state = state;
+ }
+
+ private T state;
+
+ @Override
+ public T retrieveState() throws Exception {
+ return state;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ state = null;
+ }
+
+ @Override
+ public long getStateSize() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
index 6b8c651..3e2de80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
@@ -58,7 +58,7 @@ public class FsSavepointStoreTest {
assertEquals(0, tmp.getRoot().listFiles().length);
// Store
- SavepointV0 stored = new SavepointV0(1929292, SavepointV0Test.createTaskStates(4, 24));
+ SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
String path = store.storeSavepoint(stored);
assertEquals(1, tmp.getRoot().listFiles().length);
@@ -67,7 +67,7 @@ public class FsSavepointStoreTest {
assertEquals(stored, loaded);
// Dispose
- store.disposeSavepoint(path, ClassLoader.getSystemClassLoader());
+ store.disposeSavepoint(path);
assertEquals(0, tmp.getRoot().listFiles().length);
}
@@ -122,7 +122,7 @@ public class FsSavepointStoreTest {
assertEquals(1, tmp.getRoot().listFiles().length);
// Savepoint v0
- Savepoint savepoint = new SavepointV0(checkpointId, SavepointV0Test.createTaskStates(4, 32));
+ Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
String pathSavepoint = store.storeSavepoint(savepoint);
assertEquals(2, tmp.getRoot().listFiles().length);
@@ -208,7 +208,7 @@ public class FsSavepointStoreTest {
}
@Override
- public void dispose(ClassLoader classLoader) {
+ public void dispose() {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6a85195..d703bd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -65,7 +65,7 @@ public class SavepointLoaderTest {
true);
// Store savepoint
- SavepointV0 savepoint = new SavepointV0(stored.getCheckpointID(), taskStates.values());
+ SavepointV1 savepoint = new SavepointV1(stored.getCheckpointID(), taskStates.values());
SavepointStore store = new HeapSavepointStore();
String path = store.storeSavepoint(savepoint);
@@ -84,8 +84,8 @@ public class SavepointLoaderTest {
assertEquals(stored.getCheckpointID(), loaded.getCheckpointID());
// The loaded checkpoint should not discard state when its discarded
- loaded.discard(ClassLoader.getSystemClassLoader());
- verify(state, times(0)).discard(any(ClassLoader.class));
+ loaded.discardState();
+ verify(state, times(0)).discardState();
// 2) Load and validate: parallelism mismatch
when(vertex.getParallelism()).thenReturn(222);
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
deleted file mode 100644
index b656d90..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-
-import static org.junit.Assert.assertEquals;
-
-public class SavepointV0SerializerTest {
-
- /**
- * Test serialization of {@link SavepointV0} instance.
- */
- @Test
- public void testSerializeDeserializeV1() throws Exception {
- SavepointV0 expected = new SavepointV0(123123, SavepointV0Test.createTaskStates(8, 32));
-
- SavepointV0Serializer serializer = SavepointV0Serializer.INSTANCE;
-
- // Serialize
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
- byte[] bytes = baos.toByteArray();
-
- // Deserialize
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
deleted file mode 100644
index 4d72c42..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SavepointV0Test {
-
- /**
- * Simple test of savepoint methods.
- */
- @Test
- public void testSavepointV0() throws Exception {
- long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
- int numTaskStates = 4;
- int numSubtaskStates = 16;
-
- Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
-
- SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
- assertEquals(SavepointV0.VERSION, savepoint.getVersion());
- assertEquals(checkpointId, savepoint.getCheckpointId());
- assertEquals(expected, savepoint.getTaskStates());
-
- assertFalse(savepoint.getTaskStates().isEmpty());
- savepoint.dispose(ClassLoader.getSystemClassLoader());
- assertTrue(savepoint.getTaskStates().isEmpty());
- }
-
- static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
- List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
- for (int i = 0; i < numTaskStates; i++) {
- TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates);
- for (int j = 0; j < numSubtaskStates; j++) {
- SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>(
- new CheckpointMessagesTest.MyHandle());
-
- taskState.putState(i, new SubtaskState(stateHandle, 0, 0));
- }
-
- taskStates.add(taskState);
- }
-
- return taskStates;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
new file mode 100644
index 0000000..bad836b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class SavepointV1SerializerTest {
+
+ /**
+ * Test serialization of {@link SavepointV1} instance.
+ */
+ @Test
+ public void testSerializeDeserializeV1() throws Exception {
+ SavepointV1 expected = new SavepointV1(123123, SavepointV1Test.createTaskStates(8, 32));
+
+ SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE;
+
+ // Serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
+ byte[] bytes = baos.toByteArray();
+
+ // Deserialize
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+
+ assertEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
new file mode 100644
index 0000000..ef10032
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SavepointV1Test {
+
+ /**
+ * Simple test of savepoint methods.
+ */
+ @Test
+ public void testSavepointV1() throws Exception {
+ long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
+ int numTaskStates = 4;
+ int numSubtaskStates = 16;
+
+ Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
+
+ SavepointV1 savepoint = new SavepointV1(checkpointId, expected);
+
+ assertEquals(SavepointV1.VERSION, savepoint.getVersion());
+ assertEquals(checkpointId, savepoint.getCheckpointId());
+ assertEquals(expected, savepoint.getTaskStates());
+
+ assertFalse(savepoint.getTaskStates().isEmpty());
+ savepoint.dispose();
+ assertTrue(savepoint.getTaskStates().isEmpty());
+ }
+
+ static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException {
+ List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+ for (int i = 0; i < numTaskStates; i++) {
+ TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates, numSubtaskStates);
+ for (int j = 0; j < numSubtaskStates; j++) {
+ StreamStateHandle stateHandle = new ByteStreamStateHandle("Hello".getBytes());
+ taskState.putState(i, new SubtaskState(
+ new ChainedStateHandle<>(Collections.singletonList(stateHandle)), 0));
+ }
+
+ taskState.putKeyedState(
+ 0,
+ new KeyGroupsStateHandle(
+ new KeyGroupRangeOffsets(1,1, new long[] {42}), new ByteStreamStateHandle("Hello".getBytes())));
+
+ taskStates.add(taskState);
+ }
+
+ return taskStates;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 12bbf82..c513e26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -19,17 +19,18 @@
package org.apache.flink.runtime.checkpoint.stats;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.junit.Test;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@@ -51,7 +52,7 @@ import static org.mockito.Mockito.when;
public class SimpleCheckpointStatsTrackerTest {
private static final Random RAND = new Random();
-
+
@Test
public void testNoCompletedCheckpointYet() throws Exception {
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(
@@ -154,7 +155,7 @@ public class SimpleCheckpointStatsTrackerTest {
private static void verifyJobStats(
CheckpointStatsTracker tracker,
int historySize,
- CompletedCheckpoint[] checkpoints) {
+ CompletedCheckpoint[] checkpoints) throws Exception {
assertTrue(tracker.getJobStats().isDefined());
JobCheckpointStats jobStats = tracker.getJobStats().get();
@@ -275,14 +276,15 @@ public class SimpleCheckpointStatsTrackerTest {
}
private static CompletedCheckpoint[] generateRandomCheckpoints(
- int numCheckpoints) throws IOException {
+ int numCheckpoints) throws Exception {
// Config
JobID jobId = new JobID();
int minNumOperators = 4;
int maxNumOperators = 32;
- int minParallelism = 4;
- int maxParallelism = 16;
+ int minOperatorParallelism = 4;
+ int maxOperatorParallelism = 16;
+ int maxParallelism = 32;
// Use yuge numbers here in order to test that summing up state sizes
// does not overflow. This was a bug in the initial version, because
@@ -299,7 +301,7 @@ public class SimpleCheckpointStatsTrackerTest {
for (int i = 0; i < numOperators; i++) {
operatorIds[i] = new JobVertexID();
- operatorParallelism[i] = RAND.nextInt(maxParallelism - minParallelism + 1) + minParallelism;
+ operatorParallelism[i] = RAND.nextInt(maxOperatorParallelism - minOperatorParallelism + 1) + minOperatorParallelism;
}
// Generate checkpoints
@@ -317,7 +319,7 @@ public class SimpleCheckpointStatsTrackerTest {
JobVertexID operatorId = operatorIds[operatorIndex];
int parallelism = operatorParallelism[operatorIndex];
- TaskState taskState = new TaskState(operatorId, parallelism);
+ TaskState taskState = new TaskState(operatorId, parallelism, maxParallelism);
taskGroupStates.put(operatorId, taskState);
@@ -328,9 +330,11 @@ public class SimpleCheckpointStatsTrackerTest {
completionDuration = duration;
}
+ final long proxySize = minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize)));
+ StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize);
+
SubtaskState subtaskState = new SubtaskState(
- new SerializedValue<StateHandle<?>>(null),
- minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize))),
+ new ChainedStateHandle<>(Arrays.asList(proxy)),
duration);
taskState.putState(subtaskIndex, subtaskState);
@@ -356,10 +360,32 @@ public class SimpleCheckpointStatsTrackerTest {
ExecutionJobVertex v = mock(ExecutionJobVertex.class);
when(v.getJobVertexId()).thenReturn(operatorId);
when(v.getParallelism()).thenReturn(parallelism);
-
+
jobVertices.add(v);
}
return jobVertices;
}
+
+ private static class StateHandleProxy extends FileStateHandle {
+
+ private static final long serialVersionUID = 35356735683568L;
+
+ public StateHandleProxy(Path filePath, long proxySize) {
+ super(filePath);
+ this.proxySize = proxySize;
+ }
+
+ private long proxySize;
+
+ @Override
+ public void discardState() throws Exception {
+
+ }
+
+ @Override
+ public long getStateSize() {
+ return proxySize;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 0e1c7c5..6b80c3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -53,8 +53,11 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -62,6 +65,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -141,38 +145,38 @@ public class JobManagerHARecoveryTest {
instanceManager.addInstanceListener(scheduler);
archive = system.actorOf(Props.create(
- MemoryArchivist.class,
- 10), "archive");
+ MemoryArchivist.class,
+ 10), "archive");
Props jobManagerProps = Props.create(
- TestingJobManager.class,
- flinkConfiguration,
- new ForkJoinPool(),
- instanceManager,
- scheduler,
- new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
- archive,
- new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
- timeout,
- myLeaderElectionService,
- mySubmittedJobGraphStore,
- checkpointStateFactory,
- new HeapSavepointStore(),
- jobRecoveryTimeout,
- Option.apply(null));
+ TestingJobManager.class,
+ flinkConfiguration,
+ new ForkJoinPool(),
+ instanceManager,
+ scheduler,
+ new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
+ archive,
+ new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+ timeout,
+ myLeaderElectionService,
+ mySubmittedJobGraphStore,
+ checkpointStateFactory,
+ new HeapSavepointStore(),
+ jobRecoveryTimeout,
+ Option.apply(null));
jobManager = system.actorOf(jobManagerProps, "jobmanager");
ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
taskManager = TaskManager.startTaskManagerComponentsAndActor(
- flinkConfiguration,
- ResourceID.generate(),
- system,
- "localhost",
- Option.apply("taskmanager"),
- Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
- true,
- TestingTaskManager.class);
+ flinkConfiguration,
+ ResourceID.generate(),
+ system,
+ "localhost",
+ Option.apply("taskmanager"),
+ Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
+ true,
+ TestingTaskManager.class);
ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
@@ -199,12 +203,12 @@ public class JobManagerHARecoveryTest {
BlockingStatefulInvokable.initializeStaticHelpers(slots);
Future<Object> isLeader = gateway.ask(
- TestingJobManagerMessages.getNotifyWhenLeader(),
- deadline.timeLeft());
+ TestingJobManagerMessages.getNotifyWhenLeader(),
+ deadline.timeLeft());
Future<Object> isConnectedToJobManager = tmGateway.ask(
- new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
- deadline.timeLeft());
+ new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
+ deadline.timeLeft());
// tell jobManager that he's the leader
myLeaderElectionService.isLeader(leaderSessionID);
@@ -216,8 +220,8 @@ public class JobManagerHARecoveryTest {
// submit blocking job
Future<Object> jobSubmitted = gateway.ask(
- new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
- deadline.timeLeft());
+ new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
+ deadline.timeLeft());
Await.ready(jobSubmitted, deadline.timeLeft());
@@ -298,7 +302,7 @@ public class JobManagerHARecoveryTest {
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
checkpoints.addLast(checkpoint);
if (checkpoints.size() > 1) {
- checkpoints.removeFirst().discard(ClassLoader.getSystemClassLoader());
+ checkpoints.removeFirst().discardState();
}
}
@@ -342,10 +346,12 @@ public class JobManagerHARecoveryTest {
}
@Override
- public void start() {}
+ public void start() {
+ }
@Override
- public void stop() {}
+ public void stop() {
+ }
@Override
public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception {
@@ -408,7 +414,7 @@ public class JobManagerHARecoveryTest {
@Override
public void invoke() throws Exception {
- while(blocking) {
+ while (blocking) {
synchronized (lock) {
lock.wait();
}
@@ -424,7 +430,7 @@ public class JobManagerHARecoveryTest {
}
}
- public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask<StateHandle<Long>> {
+ public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask {
private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
@@ -435,18 +441,28 @@ public class JobManagerHARecoveryTest {
private int completedCheckpoints = 0;
@Override
- public void setInitialState(StateHandle<Long> stateHandle) throws Exception {
+ public void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
int subtaskIndex = getIndexInSubtaskGroup();
if (subtaskIndex < recoveredStates.length) {
- recoveredStates[subtaskIndex] = stateHandle.getState(getUserCodeClassLoader());
+ recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(chainedState.get(0).openInputStream());
}
}
@Override
public boolean triggerCheckpoint(long checkpointId, long timestamp) {
- StateHandle<Long> state = new LocalStateHandle<>(checkpointId);
- getEnvironment().acknowledgeCheckpoint(checkpointId, state);
- return true;
+ try {
+ ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
+ InstantiationUtil.serializeObject(checkpointId));
+ RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+ getEnvironment().acknowledgeCheckpoint(
+ checkpointId,
+ chainedStateHandle,
+ Collections.<KeyGroupsStateHandle>emptyList());
+ return true;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 426dfba..6ef184d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -19,16 +19,17 @@
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Before;
@@ -36,6 +37,7 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -57,10 +59,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
- private final static StateStorageHelper<SubmittedJobGraph> localStateStorage = new StateStorageHelper<SubmittedJobGraph>() {
+ private final static RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
@Override
- public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws Exception {
- return new LocalStateHandle<>(state);
+ public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws IOException {
+ ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
+ InstantiationUtil.serializeObject(state));
+ return new RetrievableStreamStateHandle<SubmittedJobGraph>(byteStreamStateHandle);
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 73bf204..c6eb249 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -18,32 +18,38 @@
package org.apache.flink.runtime.messages;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
public class CheckpointMessagesTest {
-
+
@Test
public void testTriggerAndConfirmCheckpoint() {
try {
NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
testSerializabilityEqualsHashCode(cc);
-
+
TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
testSerializabilityEqualsHashCode(tc);
-
+
}
catch (Exception e) {
e.printStackTrace();
@@ -55,35 +61,40 @@ public class CheckpointMessagesTest {
public void testConfirmTaskCheckpointed() {
try {
AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
- new JobID(), new ExecutionAttemptID(), 569345L);
+ new JobID(), new ExecutionAttemptID(), 569345L);
+
+ KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
- new JobID(), new ExecutionAttemptID(), 87658976143L,
- new SerializedValue<StateHandle<?>>(new MyHandle()), 0);
-
+ new JobID(),
+ new ExecutionAttemptID(),
+ 87658976143L,
+ CheckpointCoordinatorTest.generateChainedStateHandle(new MyHandle()),
+ CheckpointCoordinatorTest.generateKeyGroupState(
+ keyGroupRange, Collections.singletonList(new MyHandle())));
+
testSerializabilityEqualsHashCode(noState);
testSerializabilityEqualsHashCode(withState);
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
-
+
private static void testSerializabilityEqualsHashCode(Serializable o) throws IOException {
Object copy = CommonTestUtils.createCopySerializable(o);
+ System.out.println(o.getClass() +" "+copy.getClass());
assertEquals(o, copy);
assertEquals(o.hashCode(), copy.hashCode());
assertNotNull(o.toString());
assertNotNull(copy.toString());
}
-
- public static class MyHandle implements StateHandle<Serializable> {
+
+ public static class MyHandle implements StreamStateHandle {
private static final long serialVersionUID = 8128146204128728332L;
- @Override
- public Serializable getState(ClassLoader userCodeClassLoader) {
+ public Serializable get(ClassLoader userCodeClassLoader) {
return null;
}
@@ -107,5 +118,10 @@ public class CheckpointMessagesTest {
@Override
public void close() throws IOException {}
+
+ @Override
+ public FSDataInputStream openInputStream() throws Exception {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 87540bc..19317f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -36,10 +36,13 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@@ -149,10 +152,15 @@ public class DummyEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId) {}
+ public void acknowledgeCheckpoint(long checkpointId) {
+
+ }
@Override
- public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
+ public void acknowledgeCheckpoint(long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles) {
+ }
@Override
public void failExternally(Throwable cause) {
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7b966c3..2c76399 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
@@ -96,9 +99,13 @@ public class MockEnvironment implements Environment {
private final int bufferSize;
public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+ this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration());
+ }
+
+ public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration) {
this.taskInfo = new TaskInfo(taskName, 0, 1, 0);
this.jobConfiguration = new Configuration();
- this.taskConfiguration = new Configuration();
+ this.taskConfiguration = taskConfiguration;
this.inputs = new LinkedList<InputGate>();
this.outputs = new LinkedList<ResultPartitionWriter>();
@@ -298,7 +305,9 @@ public class MockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+ public void acknowledgeCheckpoint(long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
index ad3339a..40e1852 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
@@ -85,5 +85,15 @@ public class AbstractCloseableHandleTest {
private static final class CloseableHandle extends AbstractCloseableHandle {
private static final long serialVersionUID = 1L;
+
+ @Override
+ public void discardState() throws Exception {
+
+ }
+
+ @Override
+ public long getStateSize() throws Exception {
+ return 0;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 0f1c0f7..04fa089 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -24,7 +24,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -86,7 +87,12 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
// no file operations should be possible right now
try {
- backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+ FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(
+ 2L,
+ System.currentTimeMillis());
+
+ out.write(1);
+ out.closeAndGetHandle();
fail("should fail with an exception");
} catch (IllegalStateException e) {
// supreme!
@@ -114,43 +120,6 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
}
@Test
- public void testSerializableState() {
- File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
- try {
- FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test-op", IntSerializer.INSTANCE);
-
- File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
- String state1 = "dummy state";
- String state2 = "row row row your boat";
- Integer state3 = 42;
-
- StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
- StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
- StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
-
- assertEquals(state1, handle1.getState(getClass().getClassLoader()));
- handle1.discardState();
-
- assertEquals(state2, handle2.getState(getClass().getClassLoader()));
- handle2.discardState();
-
- assertEquals(state3, handle3.getState(getClass().getClassLoader()));
- handle3.discardState();
-
- assertTrue(isDirectoryEmpty(checkpointDir));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- deleteDirectorySilently(tempDir);
- }
- }
-
- @Test
public void testStateOutputStream() {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
try {
@@ -185,16 +154,16 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
stream2.write(state2);
stream3.write(state3);
- FileStreamStateHandle handle1 = (FileStreamStateHandle) stream1.closeAndGetHandle();
+ FileStateHandle handle1 = (FileStateHandle) stream1.closeAndGetHandle();
ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle();
ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle();
// use with try-with-resources
- FileStreamStateHandle handle4;
+ StreamStateHandle handle4;
try (AbstractStateBackend.CheckpointStateOutputStream stream4 =
backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
stream4.write(state4);
- handle4 = (FileStreamStateHandle) stream4.closeAndGetHandle();
+ handle4 = stream4.closeAndGetHandle();
}
// close before accessing handle
@@ -209,18 +178,18 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
// uh-huh
}
- validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+ validateBytesInStream(handle1.openInputStream(), state1);
handle1.discardState();
assertFalse(isDirectoryEmpty(checkpointDir));
ensureLocalFileDeleted(handle1.getFilePath());
- validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+ validateBytesInStream(handle2.openInputStream(), state2);
handle2.discardState();
- validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+ validateBytesInStream(handle3.openInputStream(), state3);
handle3.discardState();
- validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+ validateBytesInStream(handle4.openInputStream(), state4);
handle4.discardState();
assertTrue(isDirectoryEmpty(checkpointDir));
}