You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:47:58 UTC
[08/10] flink git commit: [FLINK-4379] [checkpoints] Introduce
rescalable operator state
[FLINK-4379] [checkpoints] Introduce rescalable operator state
This introduces the Operator State Backend, which stores state that is not partitioned
by a key. It replaces the 'Checkpointed' interface.
Additionally, this introduces CheckpointStateHandles as container for all checkpoint related state handles
This closes #2512
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53ed6ada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53ed6ada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53ed6ada
Branch: refs/heads/master
Commit: 53ed6adac8cbe6b5dcb692dc9b94970f3ec5887c
Parents: 2afc092
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Aug 31 23:59:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 30 12:38:46 2016 +0200
----------------------------------------------------------------------
.../streaming/state/AbstractRocksDBState.java | 6 +-
.../state/RocksDBKeyedStateBackend.java | 75 ++--
.../streaming/state/RocksDBStateBackend.java | 8 +-
.../state/RocksDBAsyncSnapshotTest.java | 12 +-
.../state/RocksDBStateBackendConfigTest.java | 48 ++-
.../api/common/functions/RuntimeContext.java | 125 +-----
.../util/AbstractRuntimeUDFContext.java | 28 +-
.../flink/api/common/state/OperatorState.java | 70 ---
.../flink/api/common/state/ValueState.java | 2 +-
.../java/typeutils/runtime/JavaSerializer.java | 116 +++++
.../flink/hdfstests/FileStateBackendTest.java | 26 +-
.../AbstractCEPBasePatternOperator.java | 3 +-
.../operator/AbstractCEPPatternOperator.java | 2 -
.../AbstractKeyedCEPPatternOperator.java | 2 -
.../checkpoint/CheckpointCoordinator.java | 127 +++++-
.../runtime/checkpoint/CompletedCheckpoint.java | 5 -
.../checkpoint/OperatorStateRepartitioner.java | 42 ++
.../runtime/checkpoint/PendingCheckpoint.java | 95 +++--
.../RoundRobinOperatorStateRepartitioner.java | 190 +++++++++
.../flink/runtime/checkpoint/SubtaskState.java | 9 -
.../flink/runtime/checkpoint/TaskState.java | 79 +++-
.../savepoint/SavepointV1Serializer.java | 97 ++++-
.../deployment/TaskDeploymentDescriptor.java | 50 ++-
.../flink/runtime/execution/Environment.java | 16 +-
.../flink/runtime/executiongraph/Execution.java | 25 +-
.../runtime/executiongraph/ExecutionGraph.java | 2 -
.../runtime/executiongraph/ExecutionVertex.java | 6 +-
.../runtime/jobgraph/tasks/StatefulTask.java | 11 +-
.../checkpoint/AcknowledgeCheckpoint.java | 67 ++-
.../runtime/state/AbstractCloseableHandle.java | 126 ------
.../state/AbstractKeyedStateBackend.java | 342 +++++++++++++++
.../runtime/state/AbstractStateBackend.java | 43 +-
.../flink/runtime/state/ChainedStateHandle.java | 7 +-
.../runtime/state/CheckpointStateHandles.java | 103 +++++
.../flink/runtime/state/ClosableRegistry.java | 84 ++++
.../state/DefaultOperatorStateBackend.java | 215 ++++++++++
.../runtime/state/KeyGroupRangeOffsets.java | 2 +
.../runtime/state/KeyGroupsStateHandle.java | 6 -
.../flink/runtime/state/KeyedStateBackend.java | 301 ++-----------
.../runtime/state/OperatorStateBackend.java | 35 ++
.../runtime/state/OperatorStateHandle.java | 109 +++++
.../flink/runtime/state/OperatorStateStore.java | 47 +++
...artitionableCheckpointStateOutputStream.java | 96 +++++
.../state/RetrievableStreamStateHandle.java | 2 +-
.../flink/runtime/state/SnapshotProvider.java | 45 ++
.../apache/flink/runtime/state/StateObject.java | 6 +-
.../apache/flink/runtime/state/StateUtil.java | 37 --
.../state/filesystem/FileStateHandle.java | 8 +-
.../state/filesystem/FsStateBackend.java | 6 +-
.../state/heap/HeapKeyedStateBackend.java | 210 ++++-----
.../state/memory/ByteStreamStateHandle.java | 13 +-
.../state/memory/MemoryStateBackend.java | 9 +-
.../ActorGatewayCheckpointResponder.java | 11 +-
.../taskmanager/CheckpointResponder.java | 15 +-
.../runtime/taskmanager/RuntimeEnvironment.java | 12 +-
.../apache/flink/runtime/taskmanager/Task.java | 11 +-
.../checkpoint/CheckpointCoordinatorTest.java | 421 +++++++++++++++----
.../checkpoint/CheckpointStateRestoreTest.java | 46 +-
.../CompletedCheckpointStoreTest.java | 2 +-
.../checkpoint/PendingCheckpointTest.java | 2 +-
.../checkpoint/PendingSavepointTest.java | 2 +-
...ZooKeeperCompletedCheckpointStoreITCase.java | 5 -
.../checkpoint/savepoint/SavepointV1Test.java | 20 +-
.../stats/SimpleCheckpointStatsTrackerTest.java | 2 +-
.../jobmanager/JobManagerHARecoveryTest.java | 20 +-
.../messages/CheckpointMessagesTest.java | 17 +-
.../operators/testutils/DummyEnvironment.java | 3 +-
.../operators/testutils/MockEnvironment.java | 3 +-
.../runtime/query/QueryableStateClientTest.java | 4 +-
.../runtime/query/netty/KvStateClientTest.java | 5 +-
.../query/netty/KvStateServerHandlerTest.java | 7 +-
.../runtime/query/netty/KvStateServerTest.java | 4 +-
.../state/AbstractCloseableHandleTest.java | 97 -----
.../runtime/state/FileStateBackendTest.java | 35 +-
.../runtime/state/MemoryStateBackendTest.java | 15 +-
.../runtime/state/OperatorStateBackendTest.java | 155 +++++++
.../runtime/state/StateBackendTestBase.java | 115 ++---
.../FsCheckpointStateOutputStreamTest.java | 16 +-
.../runtime/taskmanager/TaskAsyncCallTest.java | 5 +-
.../ZooKeeperStateHandleStoreITCase.java | 4 -
.../connectors/kafka/FlinkKafkaConsumer08.java | 37 +-
.../connectors/kafka/KafkaConsumer08Test.java | 4 +-
.../connectors/kafka/FlinkKafkaConsumer09.java | 43 +-
.../kafka/FlinkKafkaConsumerBase.java | 182 +++++---
.../kafka/FlinkKafkaProducerBase.java | 27 +-
.../kafka/internals/AbstractFetcher.java | 4 +-
.../kafka/AtLeastOnceProducerTest.java | 13 +-
.../kafka/FlinkKafkaConsumerBaseTest.java | 149 ++++++-
.../connectors/kafka/KafkaConsumerTestBase.java | 30 +-
.../kafka/testutils/MockRuntimeContext.java | 10 -
.../streaming/api/checkpoint/Checkpointed.java | 1 +
.../api/checkpoint/CheckpointedFunction.java | 65 +++
.../api/checkpoint/ListCheckpointed.java | 65 +++
.../source/ContinuousFileReaderOperator.java | 5 +-
.../api/operators/AbstractStreamOperator.java | 96 ++++-
.../operators/AbstractUdfStreamOperator.java | 65 ++-
.../operators/StreamCheckpointedOperator.java | 58 +++
.../streaming/api/operators/StreamOperator.java | 43 +-
.../api/operators/StreamingRuntimeContext.java | 32 --
.../operators/GenericWriteAheadSink.java | 25 +-
.../windowing/EvictingWindowOperator.java | 4 +-
.../operators/windowing/WindowOperator.java | 14 +-
.../streaming/runtime/tasks/OperatorChain.java | 50 ++-
.../streaming/runtime/tasks/StreamTask.java | 314 ++++++++------
.../operators/StreamingRuntimeContextTest.java | 8 +-
.../streaming/runtime/io/BarrierBufferTest.java | 8 +-
.../runtime/io/BarrierTrackerTest.java | 13 +-
.../operators/StreamOperatorChainingTest.java | 15 +-
.../tasks/InterruptSensitiveRestoreTest.java | 68 +--
.../runtime/tasks/OneInputStreamTaskTest.java | 82 +++-
.../runtime/tasks/StreamMockEnvironment.java | 3 +-
.../KeyedOneInputStreamOperatorTestHarness.java | 37 +-
.../util/OneInputStreamOperatorTestHarness.java | 24 +-
.../UdfStreamOperatorCheckpointingITCase.java | 16 +-
.../streaming/runtime/StateBackendITCase.java | 11 +-
115 files changed, 3981 insertions(+), 1890 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index e878ad5..9da33ef 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -156,7 +156,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
private void writeKey(K key) throws IOException {
//write key
- int beforeWrite = (int) keySerializationStream.getPosition();
+ int beforeWrite = keySerializationStream.getPosition();
backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);
if (ambiguousKeyPossible) {
@@ -166,7 +166,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
}
private void writeNameSpace(N namespace) throws IOException {
- int beforeWrite = (int) keySerializationStream.getPosition();
+ int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);
if (ambiguousKeyPossible) {
@@ -176,7 +176,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
}
private void writeLengthFrom(int fromPosition) throws IOException {
- int length = (int) (keySerializationStream.getPosition() - fromPosition);
+ int length = keySerializationStream.getPosition() - fromPosition;
writeVariableIntBytes(length);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d5a96af..126ebd2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -39,12 +39,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.InstantiationUtil;
@@ -73,12 +73,12 @@ import java.util.PriorityQueue;
import java.util.concurrent.RunnableFuture;
/**
- * A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
+ * A {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
* streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
* checkpointing. This state backend can store very large state that exceeds memory and spills
- * to disk.
+ * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
*/
-public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
+public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
@@ -98,9 +98,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
private final File instanceRocksDBPath;
/**
- * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
- * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
- * iterating over a disposed db.
+ * Lock for protecting cleanup of the RocksDB against the checkpointing thread. We acquire this when doing
+ * asynchronous checkpoints and when disposing the DB. Otherwise, the asynchronous snapshot might try
+ * iterating over a disposed DB. After aquriring the lock, always first check if (db == null).
*/
private final SerializableObject dbDisposeLock = new SerializableObject();
@@ -110,13 +110,13 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
* instance. They all write to this instance but to their own column family.
*/
@GuardedBy("dbDisposeLock")
- protected volatile RocksDB db;
+ protected RocksDB db;
/**
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
- private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
+ private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> kvStateInformation;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
@@ -187,8 +187,8 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoreState
) throws Exception {
- this(
- jobId,
+
+ this(jobId,
operatorIdentifier,
userCodeClassLoader,
instanceBasePath,
@@ -210,15 +210,11 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
}
/**
- * @see java.io.Closeable
- *
- * Should only be called by one thread.
- *
- * @throws Exception
+ * Should only be called by one thread, and only after all accesses to the DB happened.
*/
@Override
- public void close() throws Exception {
- super.close();
+ public void dispose() {
+ super.dispose();
final RocksDB cleanupRockDBReference;
@@ -233,13 +229,17 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
// Dispose decoupled db
if (cleanupRockDBReference != null) {
- for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
column.f0.dispose();
}
cleanupRockDBReference.dispose();
}
- FileUtils.deleteDirectory(instanceBasePath);
+ try {
+ FileUtils.deleteDirectory(instanceBasePath);
+ } catch (IOException ioex) {
+ LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath);
+ }
}
public int getKeyGroupPrefixBytes() {
@@ -248,7 +248,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
/**
* Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
- * is also stopped when the backend is closed through {@link #close()}. For each backend, this method must always
+ * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
* be called by the same thread.
*
* @param checkpointId The Id of the checkpoint.
@@ -386,13 +386,13 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
outStream = checkpointStreamFactory.
createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+ stateBackend.cancelStreamRegistry.registerClosable(outStream);
outputView = new DataOutputViewStreamWrapper(outStream);
}
/**
* 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
*
- * @return
* @throws IOException
*/
public void writeDBSnapshot() throws IOException, InterruptedException {
@@ -408,7 +408,8 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
* @throws IOException
*/
public void closeCheckpointStream() throws IOException {
- if(outStream != null) {
+ if (outStream != null) {
+ stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
}
}
@@ -451,7 +452,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
int kvStateId = 0;
//iterate all column families, where each column family holds one k/v state, to write the metadata
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : stateBackend.kvStateInformation.entrySet()) {
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> column : stateBackend.kvStateInformation.entrySet()) {
//be cooperative and check for interruption from time to time in the hot loop
checkInterrupted();
@@ -463,7 +464,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
ReadOptions readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
- kvStateIterators.add(new Tuple2<RocksIterator, Integer>(iterator, kvStateId));
+ kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
++kvStateId;
}
}
@@ -624,15 +625,16 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
throws IOException, RocksDBException, ClassNotFoundException {
try {
currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+ rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
restoreKVStateData();
} finally {
- if(currentStateHandleInStream != null) {
+ if (currentStateHandleInStream != null) {
+ rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
currentStateHandleInStream.close();
}
}
-
}
/**
@@ -652,19 +654,20 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
//restore the empty columns for the k/v states through the metadata
for (int i = 0; i < numColumns; i++) {
- StateDescriptor stateDescriptor = InstantiationUtil.deserializeObject(
+ StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) InstantiationUtil.deserializeObject(
currentStateHandleInStream,
rocksDBKeyedStateBackend.userCodeClassLoader);
- Tuple2<ColumnFamilyHandle, StateDescriptor> columnFamily = rocksDBKeyedStateBackend.
+ Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> columnFamily = rocksDBKeyedStateBackend.
kvStateInformation.get(stateDescriptor.getName());
- if(null == columnFamily) {
+ if (null == columnFamily) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
- columnFamily = new Tuple2<>(rocksDBKeyedStateBackend.db.
- createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+ columnFamily = new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(
+ rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+
rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily);
}
@@ -727,9 +730,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
* <p>This also checks whether the {@link StateDescriptor} for a state matches the one
* that we checkpointed, i.e. is already in the map of column families.
*/
- protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {
+ protected ColumnFamilyHandle getColumnFamily(StateDescriptor<?, ?> descriptor) {
- Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());
+ Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName());
if (stateInfo != null) {
if (!stateInfo.f1.equals(descriptor)) {
@@ -744,7 +747,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
try {
ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
- kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor));
+ Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> tuple =
+ new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(columnFamily, descriptor);
+ kvStateInformation.put(descriptor.getName(), tuple);
return columnFamily;
} catch (RocksDBException e) {
throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index b6ce224..a0c980b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
@@ -224,7 +224,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
- public <K> KeyedStateBackend<K> createKeyedStateBackend(
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
@@ -251,7 +251,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
- public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID,
+ public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
+ Environment env,
+ JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index c0c9ca1..bccbabc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -29,10 +29,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
@@ -66,7 +64,6 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
@@ -138,12 +135,11 @@ public class RocksDBAsyncSnapshotTest {
@Override
public void acknowledgeCheckpoint(
long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles,
+ CheckpointStateHandles checkpointStateHandles,
long synchronousDurationMillis, long asynchronousDurationMillis,
long bytesBufferedInAlignment, long alignmentDurationNanos) {
- super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles,
+ super.acknowledgeCheckpoint(checkpointId, checkpointStateHandles,
synchronousDurationMillis, asynchronousDurationMillis,
bytesBufferedInAlignment, alignmentDurationNanos);
@@ -156,7 +152,7 @@ public class RocksDBAsyncSnapshotTest {
}
// should be only one k/v state
- assertEquals(1, keyGroupStateHandles.size());
+ assertEquals(1, checkpointStateHandles.getKeyGroupsStateHandle().size());
// we now know that the checkpoint went through
ensureCheckpointLatch.trigger();
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 3b851be..07fc27c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
-
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.util.OperatingSystem;
@@ -34,7 +33,6 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-
import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
@@ -45,8 +43,18 @@ import java.io.File;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
@@ -88,13 +96,15 @@ public class RocksDBStateBackendConfigTest {
assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths());
Environment env = getMockEnvironment(new File[] {});
- RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env,
- env.getJobID(),
- "test_op",
- IntSerializer.INSTANCE,
- 1,
- new KeyGroupRange(0, 0),
- env.getTaskKvStateRegistry());
+ RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+ createKeyedStateBackend(
+ env,
+ env.getJobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ env.getTaskKvStateRegistry());
File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -142,13 +152,15 @@ public class RocksDBStateBackendConfigTest {
assertNull(rocksDbBackend.getDbStoragePaths());
Environment env = getMockEnvironment(tempDirs);
- RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env,
- env.getJobID(),
- "test_op",
- IntSerializer.INSTANCE,
- 1,
- new KeyGroupRange(0, 0),
- env.getTaskKvStateRegistry());
+ RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+ createKeyedStateBackend(
+ env,
+ env.getJobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ env.getTaskKvStateRegistry());
File instanceBasePath = keyedBackend.getInstanceBasePath();
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index a9e8da9..ce513cb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,12 +18,8 @@
package org.apache.flink.api.common.functions;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -33,14 +29,16 @@ import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
* of the function will have a context through which it can access static contextual information (such as
@@ -347,117 +345,4 @@ public interface RuntimeContext {
*/
@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
-
- /**
- * Gets the key/value state, which is only accessible if the function is executed on
- * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will
- * return the value bound to the key of the element currently processed by the function.
- * Each operator may maintain multiple key/value states, addressed with different names.
- *
- * <p>Because the scope of each value is the key of the currently processed element,
- * and the elements are distributed by the Flink runtime, the system can transparently
- * scale out and redistribute the state and KeyedStream.
- *
- * <p>The following code example shows how to implement a continuous counter that counts
- * how many times elements of a certain key occur, and emits an updated count for that
- * element on each occurrence.
- *
- * <pre>{@code
- * DataStream<MyType> stream = ...;
- * KeyedStream<MyType> keyedStream = stream.keyBy("id");
- *
- * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
- *
- * private State<Long> state;
- *
- * public void open(Configuration cfg) {
- * state = getRuntimeContext().getKeyValueState(Long.class, 0L);
- * }
- *
- * public Tuple2<MyType, Long> map(MyType value) {
- * long count = state.value();
- * state.update(value + 1);
- * return new Tuple2<>(value, count);
- * }
- * });
- *
- * }</pre>
- *
- * <p>This method attempts to deduce the type information from the given type class. If the
- * full type cannot be determined from the class (for example because of generic parameters),
- * the TypeInformation object must be manually passed via
- * {@link #getKeyValueState(String, TypeInformation, Object)}.
- *
- *
- * @param name The name of the key/value state.
- * @param stateType The class of the type that is stored in the state. Used to generate
- * serializers for managed memory and checkpointing.
- * @param defaultState The default state value, returned when the state is accessed and
- * no value has yet been set for the key. May be null.
- *
- * @param <S> The type of the state.
- *
- * @return The key/value state access.
- *
- * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
- * function (function is not part os a KeyedStream).
- *
- * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead.
- */
- @Deprecated
- @PublicEvolving
- <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
-
- /**
- * Gets the key/value state, which is only accessible if the function is executed on
- * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will
- * return the value bound to the key of the element currently processed by the function.
- * Each operator may maintain multiple key/value states, addressed with different names.
- *
- * <p>Because the scope of each value is the key of the currently processed element,
- * and the elements are distributed by the Flink runtime, the system can transparently
- * scale out and redistribute the state and KeyedStream.
- *
- * <p>The following code example shows how to implement a continuous counter that counts
- * how many times elements of a certain key occur, and emits an updated count for that
- * element on each occurrence.
- *
- * <pre>{@code
- * DataStream<MyType> stream = ...;
- * KeyedStream<MyType> keyedStream = stream.keyBy("id");
- *
- * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
- *
- * private State<Long> state;
- *
- * public void open(Configuration cfg) {
- * state = getRuntimeContext().getKeyValueState(Long.class, 0L);
- * }
- *
- * public Tuple2<MyType, Long> map(MyType value) {
- * long count = state.value();
- * state.update(value + 1);
- * return new Tuple2<>(value, count);
- * }
- * });
- *
- * }</pre>
- *
- * @param name The name of the key/value state.
- * @param stateType The type information for the type that is stored in the state.
- * Used to create serializers for managed memory and checkpoints.
- * @param defaultState The default state value, returned when the state is accessed and
- * no value has yet been set for the key. May be null.
- * @param <S> The type of the state.
- *
- * @return The key/value state access.
- *
- * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
- * function (function is not part os a KeyedStream).
- *
- * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead.
- */
- @Deprecated
- @PublicEvolving
- <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6645964..4f559bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,11 +18,6 @@
package org.apache.flink.api.common.functions.util;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.Future;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
@@ -36,15 +31,18 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Future;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -207,20 +205,4 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
-
- @Override
- @Deprecated
- @PublicEvolving
- public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
- throw new UnsupportedOperationException(
- "This state is only accessible by functions executed on a KeyedStream");
- }
-
- @Override
- @Deprecated
- @PublicEvolving
- public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
- throw new UnsupportedOperationException(
- "This state is only accessible by functions executed on a KeyedStream");
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
deleted file mode 100644
index ac4ed07..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-
-/**
- * This state interface abstracts persistent key/value state in streaming programs.
- * The state is accessed and modified by user functions, and checkpointed consistently
- * by the system as part of the distributed snapshots.
- *
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
- * automatically supplied by the system, so the function always sees the value mapped to the
- * key of the current element. That way, the system can handle stream and state partitioning
- * consistently together.
- *
- * @param <T> Type of the value in the operator state
- *
- * @deprecated OperatorState has been replaced by {@link ValueState}.
- */
-@Deprecated
-@PublicEvolving
-public interface OperatorState<T> extends State {
-
- /**
- * Returns the current value for the state. When the state is not
- * partitioned the returned value is the same for all inputs in a given
- * operator instance. If state partitioning is applied, the value returned
- * depends on the current operator input, as the operator maintains an
- * independent state for each partition.
- *
- * @return The operator state value corresponding to the current input.
- *
- * @throws IOException Thrown if the system cannot access the state.
- */
- T value() throws IOException;
-
- /**
- * Updates the operator state accessible by {@link #value()} to the given
- * value. The next time {@link #value()} is called (for the same state
- * partition) the returned state will represent the updated value. When a
- * partitioned state is updated with null, the state for the current key
- * will be removed and the default value is returned on the next access.
- *
- * @param value
- * The new value for the state.
- *
- * @throws IOException Thrown if the system cannot access the state.
- */
- void update(T value) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index 607cb32..de3250a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -37,7 +37,7 @@ import java.io.IOException;
* @param <T> Type of the value in the state.
*/
@PublicEvolving
-public interface ValueState<T> extends State, OperatorState<T> {
+public interface ValueState<T> extends State {
/**
* Returns the current value for the state. When the state is not
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
new file mode 100644
index 0000000..4ae00d1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<T> duplicate() {
+ return this;
+ }
+
+ @Override
+ public T createInstance() {
+ return null;
+ }
+
+ @Override
+ public T copy(T from) {
+
+ try {
+ return InstantiationUtil.clone(from);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException("Could not copy instance of " + from + '.', e);
+ }
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target));
+ oos.writeObject(record);
+ oos.flush();
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source));
+
+ try {
+ @SuppressWarnings("unchecked")
+ T nfa = (T) ois.readObject();
+ return nfa;
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not deserialize NFA.", e);
+ }
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int size = source.readInt();
+ target.writeInt(size);
+ target.write(source, size);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof JavaSerializer && ((JavaSerializer<T>) obj).canEqual(this);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof JavaSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index df40998..080485e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.hdfstests;
import org.apache.commons.io.FileUtils;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FileStatus;
@@ -31,10 +30,8 @@ import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -243,16 +240,21 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
}
private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
- byte[] holder = new byte[data.length];
- int pos = 0;
- int read;
- while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) {
- pos += read;
- }
+ try {
+ byte[] holder = new byte[data.length];
+
+ int pos = 0;
+ int read;
+ while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) {
+ pos += read;
+ }
- assertEquals("not enough data", holder.length, pos);
- assertEquals("too much data", -1, is.read());
- assertArrayEquals("wrong data", data, holder);
+ assertEquals("not enough data", holder.length, pos);
+ assertEquals("too much data", -1, is.read());
+ assertArrayEquals("wrong data", data, holder);
+ } finally {
+ is.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
index aad408c..2f21346 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.operator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,7 +37,7 @@ import java.util.PriorityQueue;
*/
public abstract class AbstractCEPBasePatternOperator<IN, OUT>
extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<IN, OUT> {
+ implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
private static final long serialVersionUID = -4166778210774160757L;
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 64ffa2a..10bb6ff 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -104,7 +104,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
@Override
public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
- super.snapshotState(out, checkpointId, timestamp);
final ObjectOutputStream oos = new ObjectOutputStream(out);
oos.writeObject(nfa);
@@ -119,7 +118,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
@Override
@SuppressWarnings("unchecked")
public void restoreState(FSDataInputStream state) throws Exception {
- super.restoreState(state);
final ObjectInputStream ois = new ObjectInputStream(state);
nfa = (NFA<IN>)ois.readObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 09773a2..07e2662 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -187,7 +187,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
@Override
public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
- super.snapshotState(out, checkpointId, timestamp);
DataOutputView ov = new DataOutputViewStreamWrapper(out);
ov.writeInt(keys.size());
@@ -199,7 +198,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
@Override
public void restoreState(FSDataInputStream state) throws Exception {
- super.restoreState(state);
DataInputView inputView = new DataInputViewStreamWrapper(state);
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6a43ddf..4428427 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -34,9 +34,11 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -45,7 +47,9 @@ import scala.concurrent.Future;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -398,9 +402,9 @@ public class CheckpointCoordinator {
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
- final PendingCheckpoint checkpoint = props.isSavepoint() ?
- new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) :
- new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+ final PendingCheckpoint checkpoint = props.isSavepoint() ?
+ new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) :
+ new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
// schedule the timer that will clean up the expired checkpoints
TimerTask canceller = new TimerTask() {
@@ -627,9 +631,8 @@ public class CheckpointCoordinator {
isPendingCheckpoint = true;
if (checkpoint.acknowledgeTask(
- message.getTaskExecutionId(),
- message.getStateHandle(),
- message.getKeyGroupsStateHandle())) {
+ message.getTaskExecutionId(),
+ message.getCheckpointStateHandles())) {
if (checkpoint.isFullyAcknowledged()) {
completed = checkpoint.finalizeCheckpoint();
@@ -640,7 +643,7 @@ public class CheckpointCoordinator {
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
- for (Map.Entry<JobVertexID, TaskState> entry: completed.getTaskStates().entrySet()) {
+ for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
}
@@ -654,8 +657,7 @@ public class CheckpointCoordinator {
triggerQueuedRequests();
}
- }
- else {
+ } else {
// checkpoint did not accept message
LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId
+ " , task " + message.getTaskExecutionId());
@@ -790,22 +792,80 @@ public class CheckpointCoordinator {
}
+ int oldParallelism = taskState.getParallelism();
+ int newParallelism = executionJobVertex.getParallelism();
+ boolean parallelismChanged = oldParallelism != newParallelism;
boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
- if (hasNonPartitionedState && taskState.getParallelism() != executionJobVertex.getParallelism()) {
+ if (hasNonPartitionedState && parallelismChanged) {
throw new IllegalStateException("Cannot restore the latest checkpoint because " +
"the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
"state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
- " has parallelism " + executionJobVertex.getParallelism() + " whereas the corresponding" +
- "state object has a parallelism of " + taskState.getParallelism());
+ " has parallelism " + newParallelism + " whereas the corresponding" +
+ "state object has a parallelism of " + oldParallelism);
}
List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
- executionJobVertex.getMaxParallelism(),
- executionJobVertex.getParallelism());
+ executionJobVertex.getMaxParallelism(),
+ newParallelism);
+
+ // operator chain index -> list of the stored partitionables states from all parallel instances
+ @SuppressWarnings("unchecked")
+ List<OperatorStateHandle>[] chainParallelStates =
+ new List[taskState.getChainLength()];
+
+ for (int i = 0; i < oldParallelism; ++i) {
+
+ ChainedStateHandle<OperatorStateHandle> partitionableState =
+ taskState.getPartitionableState(i);
+
+ if (partitionableState != null) {
+ for (int j = 0; j < partitionableState.getLength(); ++j) {
+ OperatorStateHandle opParalleState = partitionableState.get(j);
+ if (opParalleState != null) {
+ List<OperatorStateHandle> opParallelStates =
+ chainParallelStates[j];
+ if (opParallelStates == null) {
+ opParallelStates = new ArrayList<>();
+ chainParallelStates[j] = opParallelStates;
+ }
+ opParallelStates.add(opParalleState);
+ }
+ }
+ }
+ }
+
+ // operator chain index -> lists with collected states (one collection for each parallel subtasks)
+ @SuppressWarnings("unchecked")
+ List<Collection<OperatorStateHandle>>[] redistributedParallelStates =
+ new List[taskState.getChainLength()];
+
+ //TODO here we can employ different redistribution strategies for state, e.g. union state. For now we only offer round robin as the default.
+ OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+
+ for (int i = 0; i < chainParallelStates.length; ++i) {
+ List<OperatorStateHandle> chainOpParallelStates = chainParallelStates[i];
+ if (chainOpParallelStates != null) {
+ //We only redistribute if the parallelism of the operator changed from previous executions
+ if (parallelismChanged) {
+ redistributedParallelStates[i] = repartitioner.repartitionState(
+ chainOpParallelStates,
+ newParallelism);
+ } else {
+ List<Collection<OperatorStateHandle>> repacking = new ArrayList<>(newParallelism);
+ for (OperatorStateHandle operatorStateHandle : chainOpParallelStates) {
+ repacking.add(Collections.singletonList(operatorStateHandle));
+ }
+ redistributedParallelStates[i] = repacking;
+ }
+ }
+ }
int counter = 0;
- for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
+
+ for (int i = 0; i < newParallelism; ++i) {
+
+ // non-partitioned state
ChainedStateHandle<StreamStateHandle> state = null;
if (hasNonPartitionedState) {
@@ -813,25 +873,46 @@ public class CheckpointCoordinator {
if (subtaskState != null) {
// count the number of executions for which we set a state
- counter++;
+ ++counter;
state = subtaskState.getChainedStateHandle();
}
}
+ // partitionable state
+ @SuppressWarnings("unchecked")
+ Collection<OperatorStateHandle>[] ia = new Collection[taskState.getChainLength()];
+ List<Collection<OperatorStateHandle>> subTaskPartitionableState = Arrays.asList(ia);
+
+ for (int j = 0; j < redistributedParallelStates.length; ++j) {
+ List<Collection<OperatorStateHandle>> redistributedParallelState =
+ redistributedParallelStates[j];
+
+ if (redistributedParallelState != null) {
+ subTaskPartitionableState.set(j, redistributedParallelState.get(i));
+ }
+ }
+
+ // key-partitioned state
KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(i);
- List<KeyGroupsStateHandle> subtaskKeyGroupStates = getKeyGroupsStateHandles(
- taskState.getKeyGroupStates(),
- subtaskKeyGroupIds);
+ // Again, we only repartition if the parallelism changed
+ List<KeyGroupsStateHandle> subtaskKeyGroupStates = parallelismChanged ?
+ getKeyGroupsStateHandles(taskState.getKeyGroupStates(), subtaskKeyGroupIds)
+ : Collections.singletonList(taskState.getKeyGroupState(i));
Execution currentExecutionAttempt = executionJobVertex
.getTaskVertices()[i]
.getCurrentExecutionAttempt();
- currentExecutionAttempt.setInitialState(state, subtaskKeyGroupStates);
+ CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(
+ state,
+ null/*subTaskPartionableState*/, //TODO chose right structure and put redistributed states here
+ subtaskKeyGroupStates);
+
+ currentExecutionAttempt.setInitialState(checkpointStateHandles, subTaskPartitionableState);
}
- if (allOrNothingState && counter > 0 && counter < executionJobVertex.getParallelism()) {
+ if (allOrNothingState && counter > 0 && counter < newParallelism) {
throw new IllegalStateException("The checkpoint contained state only for " +
"a subset of tasks for vertex " + executionJobVertex);
}
@@ -859,7 +940,7 @@ public class CheckpointCoordinator {
for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) {
KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
- if(intersection.getNumberOfKeyGroups() > 0) {
+ if (intersection.getNumberOfKeyGroups() > 0) {
subtaskKeyGroupStates.add(intersection);
}
}
@@ -881,7 +962,7 @@ public class CheckpointCoordinator {
public static List<KeyGroupRange> createKeyGroupPartitions(int numberKeyGroups, int parallelism) {
Preconditions.checkArgument(numberKeyGroups >= parallelism);
List<KeyGroupRange> result = new ArrayList<>(parallelism);
- int start = 0;
+
for (int i = 0; i < parallelism; ++i) {
result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 7cb3916..0d279f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -153,9 +153,4 @@ public class CompletedCheckpoint implements StateObject {
public String toString() {
return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
}
-
- @Override
- public void close() throws IOException {
- StateUtil.bestEffortCloseAllStateObjects(taskStates.values());
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
new file mode 100644
index 0000000..98810f1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Interface that allows to implement different strategies for repartitioning of operator state as parallelism changes.
+ */
+public interface OperatorStateRepartitioner {
+
+ /**
+ * @param previousParallelSubtaskStates List of state handles to the parallel subtask states of an operator, as they
+ * have been checkpointed.
+ * @param parallelism The parallelism that we consider for the state redistribution. Determines the size of the
+ * returned list.
+ * @return List with one entry per parallel subtask. Each subtask receives now one collection of states that build
+ * of the new total state for this subtask.
+ */
+ List<Collection<OperatorStateHandle>> repartitionState(
+ List<OperatorStateHandle> previousParallelSubtaskStates,
+ int parallelism);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index d499a5a..2ca9d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -23,7 +23,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -167,49 +169,80 @@ public class PendingCheckpoint {
}
public boolean acknowledgeTask(
- ExecutionAttemptID attemptID,
- ChainedStateHandle<StreamStateHandle> state,
- List<KeyGroupsStateHandle> keyGroupsState) {
+ ExecutionAttemptID attemptID,
+ CheckpointStateHandles checkpointStateHandles) {
synchronized (lock) {
if (discarded) {
return false;
}
-
- ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
- if (vertex != null) {
- if (state != null || keyGroupsState != null) {
- JobVertexID jobVertexID = vertex.getJobvertexId();
+ ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
- TaskState taskState;
+ if (vertex != null) {
- if (taskStates.containsKey(jobVertexID)) {
- taskState = taskStates.get(jobVertexID);
- } else {
- taskState = new TaskState(jobVertexID, vertex.getTotalNumberOfParallelSubtasks(), vertex.getMaxParallelism());
- taskStates.put(jobVertexID, taskState);
+ if (checkpointStateHandles != null) {
+ List<KeyGroupsStateHandle> keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle();
+ ChainedStateHandle<StreamStateHandle> nonPartitionedState =
+ checkpointStateHandles.getNonPartitionedStateHandles();
+ ChainedStateHandle<OperatorStateHandle> partitioneableState =
+ checkpointStateHandles.getPartitioneableStateHandles();
+
+ if (nonPartitionedState != null || partitioneableState != null || keyGroupsState != null) {
+
+ JobVertexID jobVertexID = vertex.getJobvertexId();
+
+ int subtaskIndex = vertex.getParallelSubtaskIndex();
+
+ TaskState taskState;
+
+ if (taskStates.containsKey(jobVertexID)) {
+ taskState = taskStates.get(jobVertexID);
+ } else {
+ //TODO this should go away when we remove chained state, assigning state to operators directly instead
+ int chainLength;
+ if (nonPartitionedState != null) {
+ chainLength = nonPartitionedState.getLength();
+ } else if (partitioneableState != null) {
+ chainLength = partitioneableState.getLength();
+ } else {
+ chainLength = 1;
+ }
+
+ taskState = new TaskState(
+ jobVertexID,
+ vertex.getTotalNumberOfParallelSubtasks(),
+ vertex.getMaxParallelism(),
+ chainLength);
+
+ taskStates.put(jobVertexID, taskState);
+ }
+
+ long duration = System.currentTimeMillis() - checkpointTimestamp;
+
+ if (nonPartitionedState != null) {
+ taskState.putState(
+ subtaskIndex,
+ new SubtaskState(nonPartitionedState, duration));
+ }
+
+ if(partitioneableState != null && !partitioneableState.isEmpty()) {
+ taskState.putPartitionableState(subtaskIndex, partitioneableState);
+ }
+
+ // currently a checkpoint can only contain keyed state
+ // for the head operator
+ if (keyGroupsState != null && !keyGroupsState.isEmpty()) {
+ KeyGroupsStateHandle keyGroupsStateHandle = keyGroupsState.get(0);
+ taskState.putKeyedState(subtaskIndex, keyGroupsStateHandle);
+ }
}
+ }
- long duration = System.currentTimeMillis() - checkpointTimestamp;
+ ++numAcknowledgedTasks;
- if (state != null) {
- taskState.putState(
- vertex.getParallelSubtaskIndex(),
- new SubtaskState(state, duration));
- }
-
- // currently a checkpoint can only contain keyed state
- // for the head operator
- if (keyGroupsState != null && !keyGroupsState.isEmpty()) {
- KeyGroupsStateHandle keyGroupsStateHandle = keyGroupsState.get(0);
- taskState.putKeyedState(vertex.getParallelSubtaskIndex(), keyGroupsStateHandle);
- }
- }
- numAcknowledgedTasks++;
return true;
- }
- else {
+ } else {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
new file mode 100644
index 0000000..09a35f6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Current default implementation of {@link OperatorStateRepartitioner} that redistributes state in round robin fashion.
+ */
+public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepartitioner {
+
+ public static final OperatorStateRepartitioner INSTANCE = new RoundRobinOperatorStateRepartitioner();
+ private static final boolean OPTIMIZE_MEMORY_USE = false;
+
+ @Override
+ public List<Collection<OperatorStateHandle>> repartitionState(
+ List<OperatorStateHandle> previousParallelSubtaskStates,
+ int parallelism) {
+
+ Preconditions.checkNotNull(previousParallelSubtaskStates);
+ Preconditions.checkArgument(parallelism > 0);
+
+ // Reorganize: group by (State Name -> StreamStateHandle + Offsets)
+ Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState =
+ groupByStateName(previousParallelSubtaskStates);
+
+ if (OPTIMIZE_MEMORY_USE) {
+ previousParallelSubtaskStates.clear(); // free for GC at to cost that old handles are no longer available
+ }
+
+ // Assemble result from all merge maps
+ List<Collection<OperatorStateHandle>> result = new ArrayList<>(parallelism);
+
+ // Do the actual repartitioning for all named states
+ List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
+ repartition(nameToState, parallelism);
+
+ for (int i = 0; i < mergeMapList.size(); ++i) {
+ result.add(i, new ArrayList<>(mergeMapList.get(i).values()));
+ }
+
+ return result;
+ }
+
+ /**
+ * Group by the different named states.
+ */
+ private Map<String, List<Tuple2<StreamStateHandle, long[]>>> groupByStateName(
+ List<OperatorStateHandle> previousParallelSubtaskStates) {
+
+ //Reorganize: group by (State Name -> StreamStateHandle + Offsets)
+ Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState = new HashMap<>();
+ for (OperatorStateHandle psh : previousParallelSubtaskStates) {
+
+ for (Map.Entry<String, long[]> e : psh.getStateNameToPartitionOffsets().entrySet()) {
+
+ List<Tuple2<StreamStateHandle, long[]>> stateLocations = nameToState.get(e.getKey());
+
+ if (stateLocations == null) {
+ stateLocations = new ArrayList<>();
+ nameToState.put(e.getKey(), stateLocations);
+ }
+
+ stateLocations.add(new Tuple2<>(psh.getDelegateStateHandle(), e.getValue()));
+ }
+ }
+ return nameToState;
+ }
+
+ /**
+ * Repartition all named states.
+ */
+ private List<Map<StreamStateHandle, OperatorStateHandle>> repartition(
+ Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState, int parallelism) {
+
+ // We will use this to merge w.r.t. StreamStateHandles for each parallel subtask inside the maps
+ List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = new ArrayList<>(parallelism);
+ // Initialize
+ for (int i = 0; i < parallelism; ++i) {
+ mergeMapList.add(new HashMap<StreamStateHandle, OperatorStateHandle>());
+ }
+
+ int startParallelOP = 0;
+ // Iterate all named states and repartition one named state at a time per iteration
+ for (Map.Entry<String, List<Tuple2<StreamStateHandle, long[]>>> e : nameToState.entrySet()) {
+
+ List<Tuple2<StreamStateHandle, long[]>> current = e.getValue();
+
+ // Determine actual number of partitions for this named state
+ int totalPartitions = 0;
+ for (Tuple2<StreamStateHandle, long[]> offsets : current) {
+ totalPartitions += offsets.f1.length;
+ }
+
+ // Repartition the state across the parallel operator instances
+ int lstIdx = 0;
+ int offsetIdx = 0;
+ int baseFraction = totalPartitions / parallelism;
+ int remainder = totalPartitions % parallelism;
+
+ int newStartParallelOp = startParallelOP;
+
+ for (int i = 0; i < parallelism; ++i) {
+
+ // Preparation: calculate the actual index considering wrap around
+ int parallelOpIdx = (i + startParallelOP) % parallelism;
+
+ // Now calculate the number of partitions we will assign to the parallel instance in this round ...
+ int numberOfPartitionsToAssign = baseFraction;
+
+ // ... and distribute odd partitions while we still have some, one at a time
+ if (remainder > 0) {
+ ++numberOfPartitionsToAssign;
+ --remainder;
+ } else if (remainder == 0) {
+ // We are out of odd partitions now and begin our next redistribution round with the current
+ // parallel operator to ensure fair load balance
+ newStartParallelOp = parallelOpIdx;
+ --remainder;
+ }
+
+ // Now start collection the partitions for the parallel instance into this list
+ List<Tuple2<StreamStateHandle, long[]>> parallelOperatorState = new ArrayList<>();
+
+ while (numberOfPartitionsToAssign > 0) {
+ Tuple2<StreamStateHandle, long[]> handleWithOffsets = current.get(lstIdx);
+ long[] offsets = handleWithOffsets.f1;
+ int remaining = offsets.length - offsetIdx;
+ // Repartition offsets
+ long[] offs;
+ if (remaining > numberOfPartitionsToAssign) {
+ offs = Arrays.copyOfRange(offsets, offsetIdx, offsetIdx + numberOfPartitionsToAssign);
+ offsetIdx += numberOfPartitionsToAssign;
+ } else {
+ if (OPTIMIZE_MEMORY_USE) {
+ handleWithOffsets.f1 = null; // GC
+ }
+ offs = Arrays.copyOfRange(offsets, offsetIdx, offsets.length);
+ offsetIdx = 0;
+ ++lstIdx;
+ }
+
+ parallelOperatorState.add(
+ new Tuple2<>(handleWithOffsets.f0, offs));
+
+ numberOfPartitionsToAssign -= remaining;
+
+ // As a last step we merge partitions that use the same StreamStateHandle in a single
+ // OperatorStateHandle
+ Map<StreamStateHandle, OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx);
+ OperatorStateHandle psh = mergeMap.get(handleWithOffsets.f0);
+ if (psh == null) {
+ psh = new OperatorStateHandle(handleWithOffsets.f0, new HashMap<String, long[]>());
+ mergeMap.put(handleWithOffsets.f0, psh);
+ }
+ psh.getStateNameToPartitionOffsets().put(e.getKey(), offs);
+ }
+ }
+ startParallelOP = newStartParallelOp;
+ e.setValue(null);
+ }
+ return mergeMapList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 9beb233..2aa0491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -24,8 +24,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -112,11 +110,4 @@ public class SubtaskState implements StateObject {
public String toString() {
return String.format("SubtaskState(Size: %d, Duration: %d, State: %s)", stateSize, duration, chainedStateHandle);
}
-
- @Override
- public void close() throws IOException {
- chainedStateHandle.close();
- }
-
-
}