You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:32 UTC
[3/8] flink git commit: [FLINK-9489] Checkpoint timers as part of
managed keyed state instead of raw keyed state
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 622dd0c..0697463 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -63,14 +63,18 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
@@ -177,7 +181,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private interface StateFactory {
<K, N, SV, S extends State, IS extends S> IS createState(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) throws Exception;
}
@@ -224,7 +228,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
- private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+ private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
/**
* Map of state names to their corresponding restored state meta info.
@@ -256,7 +260,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
/** Factory for priority queue state. */
- private PriorityQueueSetFactory priorityQueueFactory;
+ private final PriorityQueueSetFactory priorityQueueFactory;
+
+ private RocksDBWriteBatchWrapper writeBatchWrapper;
public RocksDBKeyedStateBackend(
String operatorIdentifier,
@@ -322,7 +328,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
break;
default:
- break;
+ throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType);
}
LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
@@ -344,12 +350,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = kvStateInformation.get(state);
- if (columnInfo == null) {
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = kvStateInformation.get(state);
+ if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
- final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) columnInfo.f1.getNamespaceSerializer();
+ RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
+ (RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
+
+ final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
final byte[] nameSpaceBytes;
@@ -396,6 +405,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// working on the disposed object results in SEGFAULTS.
if (db != null) {
+ IOUtils.closeQuietly(writeBatchWrapper);
+
// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
// DB is closed. See:
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
@@ -403,16 +414,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
IOUtils.closeQuietly(defaultColumnFamily);
// ... continue with the ones created by Flink...
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData :
kvStateInformation.values()) {
IOUtils.closeQuietly(columnMetaData.f0);
}
- // ... then close the priority queue related resources ...
- if (priorityQueueFactory instanceof AutoCloseable) {
- IOUtils.closeQuietly((AutoCloseable) priorityQueueFactory);
- }
-
// ... and finally close the DB instance ...
IOUtils.closeQuietly(db);
@@ -431,13 +437,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Nonnull
@Override
- public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+ create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor) {
-
- return priorityQueueFactory.create(stateName, byteOrderedElementSerializer, elementComparator, keyExtractor);
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+ return priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
}
private void cleanInstanceBasePath() {
@@ -482,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
+
LOG.info("Initializing RocksDB keyed state backend.");
if (LOG.isDebugEnabled()) {
@@ -534,6 +539,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void createDB() throws IOException {
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+ this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
@@ -679,7 +685,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn =
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
if (registeredColumn == null) {
@@ -689,8 +695,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
nameBytes,
rocksDBKeyedStateBackend.columnOptions);
- RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+ RegisteredStateMetaInfoBase stateMetaInfo =
+ RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
@@ -987,12 +993,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ColumnFamilyHandle columnFamilyHandle,
StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredStateMetaInfoEntry =
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
if (null == registeredStateMetaInfoEntry) {
- RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+ RegisteredStateMetaInfoBase stateMetaInfo =
+ RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
registeredStateMetaInfoEntry =
new Tuple2<>(
@@ -1037,6 +1043,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateBackend.db = restoreDBInfo.db;
stateBackend.defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle;
+ stateBackend.writeBatchWrapper =
+ new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) {
getOrRegisterColumnFamilyHandle(
@@ -1061,6 +1069,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Collections.emptyList(),
columnFamilyHandles);
stateBackend.defaultColumnFamily = columnFamilyHandles.get(0);
+ stateBackend.writeBatchWrapper =
+ new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
}
}
@@ -1115,13 +1125,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// extract and store the default column family which is located at the first index
stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0);
+ stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
- RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
- new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+ RegisteredStateMetaInfoBase stateMetaInfo =
+ RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
stateBackend.kvStateInformation.put(
stateMetaInfoSnapshot.getName(),
@@ -1290,14 +1301,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* already have a registered entry for that and return it (after some necessary state compatibility checks)
* or create a new one if it does not exist.
*/
- private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
+ private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
StateDescriptor<?, S> stateDesc,
TypeSerializer<N> namespaceSerializer) throws StateMigrationException {
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
kvStateInformation.get(stateDesc.getName());
- RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo;
+ RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
if (stateInfo != null) {
@SuppressWarnings("unchecked")
@@ -1308,7 +1319,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored snapshot cannot be found.");
- newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+ newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
restoredMetaInfoSnapshot,
namespaceSerializer,
stateDesc);
@@ -1317,13 +1328,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
} else {
String stateName = stateDesc.getName();
- newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateName,
namespaceSerializer,
stateDesc.getSerializer());
- ColumnFamilyHandle columnFamily = createColumnFamily(stateName, db);
+ ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
stateInfo = Tuple2.of(columnFamily, newMetaInfo);
kvStateInformation.put(stateDesc.getName(), stateInfo);
@@ -1335,7 +1346,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Creates a column family handle for use with a k/v state.
*/
- private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB db) {
+ private ColumnFamilyHandle createColumnFamily(String stateName) {
byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");
@@ -1359,7 +1370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult =
tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
}
@@ -1382,7 +1393,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public int numStateEntries() {
int count = 0;
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : kvStateInformation.values()) {
+ //TODO maybe filter only for k/v states
try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) {
rocksIterator.seekToFirst();
@@ -1405,7 +1417,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@VisibleForTesting
static final class RocksDBMergeIterator implements AutoCloseable {
- private final PriorityQueue<MergeIterator> heap;
+ private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
private final int keyGroupPrefixByteCount;
private boolean newKeyGroup;
private boolean newKVState;
@@ -1763,6 +1775,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
+ // flush everything into db before taking a snapshot
+ writeBatchWrapper.flush();
+
final RocksDBFullSnapshotOperation<K> snapshotOperation =
new RocksDBFullSnapshotOperation<>(
RocksDBKeyedStateBackend.this,
@@ -1982,7 +1997,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 :
stateBackend.kvStateInformation.values()) {
// snapshot meta info
this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
@@ -2433,7 +2448,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
// save meta data
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
}
@@ -2625,10 +2640,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Encapsulates the logic and resources in connection with creating priority queue state structures.
*/
- class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory, AutoCloseable {
+ class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
/** Default cache size per key-group. */
- private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
+ private static final int DEFAULT_CACHES_SIZE = 1024;
/** A shared buffer to serialize elements for the priority queue. */
@Nonnull
@@ -2638,68 +2653,47 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Nonnull
private final DataOutputViewStreamWrapper elementSerializationOutView;
- /** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
- @Nonnull
- private final RocksDBWriteBatchWrapper writeBatchWrapper;
-
- /** Map to track all column families created to back priority queues. */
- @Nonnull
- private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
-
- /** The mandatory default column family, so that we can close it later. */
- @Nonnull
- private final ColumnFamilyHandle defaultColumnFamily;
-
- /** Path of the RocksDB instance that holds the priority queues. */
- @Nonnull
- private final File pqInstanceRocksDBPath;
-
- /** RocksDB instance that holds the priority queues. */
- @Nonnull
- private final RocksDB pqDb;
-
- RocksDBPriorityQueueSetFactory() throws IOException {
- this.pqInstanceRocksDBPath = new File(instanceBasePath, "pqdb");
- if (pqInstanceRocksDBPath.exists()) {
- try {
- FileUtils.deleteDirectory(pqInstanceRocksDBPath);
- } catch (IOException ex) {
- LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
- }
- }
- List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
- this.pqDb = openDB(pqInstanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+ RocksDBPriorityQueueSetFactory() {
this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
- this.writeBatchWrapper = new RocksDBWriteBatchWrapper(pqDb, writeOptions);
- this.defaultColumnFamily = columnFamilyHandles.get(0);
- this.priorityQueueColumnFamilies = new HashMap<>();
}
@Nonnull
@Override
- public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+ create(
@Nonnull String stateName,
- @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
- @Nonnull PriorityComparator<T> elementPriorityComparator,
- @Nonnull KeyExtractorFunction<T> keyExtractor) {
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+ final PriorityComparator<T> priorityComparator =
+ PriorityComparator.forPriorityComparableObjects();
+
+ Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> entry =
+ kvStateInformation.get(stateName);
- final ColumnFamilyHandle columnFamilyHandle =
- priorityQueueColumnFamilies.computeIfAbsent(
- stateName,
- (name) -> RocksDBKeyedStateBackend.this.createColumnFamily(name, pqDb));
+ if (entry == null) {
+ RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+ new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
+
+ final ColumnFamilyHandle columnFamilyHandle = createColumnFamily(stateName);
+
+ entry = new Tuple2<>(columnFamilyHandle, metaInfo);
+ kvStateInformation.put(stateName, entry);
+ }
+
+ final ColumnFamilyHandle columnFamilyHandle = entry.f0;
@Nonnull
TieBreakingPriorityComparator<T> tieBreakingComparator =
new TieBreakingPriorityComparator<>(
- elementPriorityComparator,
+ priorityComparator,
byteOrderedElementSerializer,
elementSerializationOutStream,
elementSerializationOutView);
return new KeyGroupPartitionedPriorityQueue<>(
- keyExtractor,
- elementPriorityComparator,
+ KeyExtractorFunction.forKeyedObjects(),
+ priorityComparator,
new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
@Nonnull
@Override
@@ -2714,7 +2708,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
new RocksDBOrderedSetStore<>(
keyGroupId,
keyGroupPrefixBytes,
- pqDb,
+ db,
columnFamilyHandle,
byteOrderedElementSerializer,
elementSerializationOutStream,
@@ -2727,20 +2721,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
keyGroupRange,
numberOfKeyGroups);
}
+ }
- @Override
- public void close() {
- IOUtils.closeQuietly(writeBatchWrapper);
- for (ColumnFamilyHandle columnFamilyHandle : priorityQueueColumnFamilies.values()) {
- IOUtils.closeQuietly(columnFamilyHandle);
- }
- IOUtils.closeQuietly(defaultColumnFamily);
- IOUtils.closeQuietly(pqDb);
- try {
- FileUtils.deleteDirectory(pqInstanceRocksDBPath);
- } catch (IOException ex) {
- LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
- }
- }
+ @Override
+ public boolean requiresLegacySynchronousTimerSnapshots() {
+ return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 03faa44..aa5e93a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -257,7 +257,7 @@ class RocksDBListState<K, N, V>
@SuppressWarnings("unchecked")
static <E, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBListState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 9d00a67..4ec1f77 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -616,7 +616,7 @@ class RocksDBMapState<K, N, UK, UV>
@SuppressWarnings("unchecked")
static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBMapState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
index 5284314..4068c50 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
@@ -159,7 +159,6 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
public RocksToJavaIteratorAdapter orderedIterator() {
flushWriteBatch();
-
return new RocksToJavaIteratorAdapter(
new RocksIteratorWrapper(
db.newIterator(columnFamilyHandle)));
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index d138045..490960e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -172,7 +172,7 @@ class RocksDBReducingState<K, N, V>
@SuppressWarnings("unchecked")
static <K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBReducingState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 2b60fc1..5ae894e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -116,7 +116,7 @@ class RocksDBValueState<K, N, V>
@SuppressWarnings("unchecked")
static <K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+ Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBValueState<>(
registerResult.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
index d54f122..05adf4f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
@@ -102,6 +102,10 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable {
batch.clear();
}
+ public WriteOptions getOptions() {
+ return options;
+ }
+
@Override
public void close() throws RocksDBException {
if (batch.count() != 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 797a26a..52ba3e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -402,7 +402,11 @@ public abstract class AbstractStreamOperator<OUT>
* @param context context that provides information and means required for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws Exception {
- if (getKeyedStateBackend() != null) {
+ final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+ //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
+ if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
+ ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
+
KeyedStateCheckpointOutputStream out;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
deleted file mode 100644
index cc7fbc4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyExtractorFunction;
-import org.apache.flink.runtime.state.KeyGroupPartitioner;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.lang.reflect.Array;
-
-/**
- * This class represents the snapshot of an {@link HeapPriorityQueueSet}.
- *
- * @param <T> type of the state elements.
- */
-public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
-
- /** Function that extracts keys from elements. */
- @Nonnull
- private final KeyExtractorFunction<T> keyExtractor;
-
- /** Copy of the heap array containing all the (immutable or deeply copied) elements. */
- @Nonnull
- private final T[] heapArrayCopy;
-
- /** The element serializer. */
- @Nonnull
- private final TypeSerializer<T> elementSerializer;
-
- /** The key-group range covered by this snapshot. */
- @Nonnull
- private final KeyGroupRange keyGroupRange;
-
- /** The total number of key-groups in the job. */
- @Nonnegative
- private final int totalKeyGroups;
-
- /** Result of partitioning the snapshot by key-group. */
- @Nullable
- private KeyGroupPartitionedSnapshot partitionedSnapshot;
-
- HeapPriorityQueueStateSnapshot(
- @Nonnull T[] heapArrayCopy,
- @Nonnull KeyExtractorFunction<T> keyExtractor,
- @Nonnull TypeSerializer<T> elementSerializer,
- @Nonnull KeyGroupRange keyGroupRange,
- @Nonnegative int totalKeyGroups) {
-
- // TODO ensure that the array contains a deep copy of elements if we are *not* dealing with immutable types.
- assert elementSerializer.isImmutableType();
-
- this.keyExtractor = keyExtractor;
- this.heapArrayCopy = heapArrayCopy;
- this.elementSerializer = elementSerializer;
- this.keyGroupRange = keyGroupRange;
- this.totalKeyGroups = totalKeyGroups;
- }
-
- @SuppressWarnings("unchecked")
- @Nonnull
- @Override
- public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
-
- if (partitionedSnapshot == null) {
-
- T[] partitioningOutput = (T[]) Array.newInstance(
- heapArrayCopy.getClass().getComponentType(),
- heapArrayCopy.length);
-
- KeyGroupPartitioner<T> keyGroupPartitioner =
- new KeyGroupPartitioner<>(
- heapArrayCopy,
- heapArrayCopy.length,
- partitioningOutput,
- keyGroupRange,
- totalKeyGroups,
- keyExtractor,
- elementSerializer::serialize);
-
- partitionedSnapshot = keyGroupPartitioner.partitionByKeyGroup();
- }
-
- return partitionedSnapshot;
- }
-
- @Override
- public void release() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index ad1617e..29b89c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -46,6 +46,11 @@ import java.util.Map;
@Internal
public class InternalTimeServiceManager<K> {
+ //TODO guard these constants with a test
+ private static final String TIMER_STATE_PREFIX = "_timer_state";
+ private static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
+ private static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
+
private final int totalKeyGroups;
private final KeyGroupRange localKeyGroupRange;
private final KeyContext keyContext;
@@ -55,12 +60,14 @@ public class InternalTimeServiceManager<K> {
private final Map<String, HeapInternalTimerService<K, ?>> timerServices;
+ private final boolean useLegacySynchronousSnapshots;
+
InternalTimeServiceManager(
- int totalKeyGroups,
- KeyGroupRange localKeyGroupRange,
- KeyContext keyContext,
- PriorityQueueSetFactory priorityQueueSetFactory,
- ProcessingTimeService processingTimeService) {
+ int totalKeyGroups,
+ KeyGroupRange localKeyGroupRange,
+ KeyContext keyContext,
+ PriorityQueueSetFactory priorityQueueSetFactory,
+ ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {
Preconditions.checkArgument(totalKeyGroups > 0);
this.totalKeyGroups = totalKeyGroups;
@@ -68,6 +75,7 @@ public class InternalTimeServiceManager<K> {
this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
this.keyContext = Preconditions.checkNotNull(keyContext);
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
+ this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
this.timerServices = new HashMap<>();
}
@@ -97,8 +105,8 @@ public class InternalTimeServiceManager<K> {
localKeyGroupRange,
keyContext,
processingTimeService,
- createTimerPriorityQueue("__ts_" + name + "/processing_timers", timerSerializer),
- createTimerPriorityQueue("__ts_" + name + "/event_timers", timerSerializer));
+ createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
+ createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
timerServices.put(name, timerService);
}
@@ -114,9 +122,7 @@ public class InternalTimeServiceManager<K> {
TimerSerializer<K, N> timerSerializer) {
return priorityQueueSetFactory.create(
name,
- timerSerializer,
- InternalTimer.getTimerComparator(),
- InternalTimer.getKeyExtractorFunction());
+ timerSerializer);
}
public void advanceWatermark(Watermark watermark) throws Exception {
@@ -128,6 +134,7 @@ public class InternalTimeServiceManager<K> {
////////////////// Fault Tolerance Methods ///////////////////
public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
+ Preconditions.checkState(useLegacySynchronousSnapshots);
InternalTimerServiceSerializationProxy<K> serializationProxy =
new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);
@@ -148,6 +155,10 @@ public class InternalTimeServiceManager<K> {
serializationProxy.read(stream);
}
+ public boolean isUseLegacySynchronousSnapshots() {
+ return useLegacySynchronousSnapshots;
+ }
+
//////////////////// Methods used ONLY IN TESTS ////////////////////
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
index f88b4fb..324f3fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import javax.annotation.Nonnull;
@@ -31,7 +33,7 @@ import javax.annotation.Nonnull;
* @param <N> Type of the namespace to which timers are scoped.
*/
@Internal
-public interface InternalTimer<K, N> {
+public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {
/** Function to extract the key from a {@link InternalTimer}. */
KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;
@@ -48,6 +50,7 @@ public interface InternalTimer<K, N> {
* Returns the key that is bound to this timer.
*/
@Nonnull
+ @Override
K getKey();
/**
@@ -55,14 +58,4 @@ public interface InternalTimer<K, N> {
*/
@Nonnull
N getNamespace();
-
- @SuppressWarnings("unchecked")
- static <T extends InternalTimer> PriorityComparator<T> getTimerComparator() {
- return (PriorityComparator<T>) TIMER_COMPARATOR;
- }
-
- @SuppressWarnings("unchecked")
- static <T extends InternalTimer> KeyExtractorFunction<T> getKeyExtractorFunction() {
- return (KeyExtractorFunction<T>) KEY_EXTRACTOR_FUNCTION;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index dbb74e5..4f762e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -19,9 +19,11 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -29,6 +31,8 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
@@ -96,7 +100,7 @@ public class InternalTimersSnapshotReaderWriters {
public final void writeTimersSnapshot(DataOutputView out) throws IOException {
writeKeyAndNamespaceSerializers(out);
- TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer = new TimerHeapInternalTimer.TimerSerializer<>(
+ LegacyTimerSerializer<K, N> timerSerializer = new LegacyTimerSerializer<>(
timersSnapshot.getKeySerializer(),
timersSnapshot.getNamespaceSerializer());
@@ -215,8 +219,8 @@ public class InternalTimersSnapshotReaderWriters {
restoreKeyAndNamespaceSerializers(restoredTimersSnapshot, in);
- TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer =
- new TimerHeapInternalTimer.TimerSerializer<>(
+ LegacyTimerSerializer<K, N> timerSerializer =
+ new LegacyTimerSerializer<>(
restoredTimersSnapshot.getKeySerializer(),
restoredTimersSnapshot.getNamespaceSerializer());
@@ -289,4 +293,120 @@ public class InternalTimersSnapshotReaderWriters {
restoredTimersSnapshot.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(1).f1);
}
}
+
+ /**
+ * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
+ */
+ public static class LegacyTimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
+
+ private static final long serialVersionUID = 1119562170939152304L;
+
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+
+ @Nonnull
+ private final TypeSerializer<N> namespaceSerializer;
+
+ LegacyTimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
+ this.keySerializer = keySerializer;
+ this.namespaceSerializer = namespaceSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
+
+ final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
+ final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
+
+ if (keySerializerDuplicate == keySerializer &&
+ namespaceSerializerDuplicate == namespaceSerializer) {
+ // all delegate serializers seem stateless, so this is also stateless.
+ return this;
+ } else {
+ // at least one delegate serializer seems to be stateful, so we return a new instance.
+ return new LegacyTimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
+ }
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> createInstance() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
+ return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ // we do not have fixed length
+ return -1;
+ }
+
+ @Override
+ public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
+ keySerializer.serialize(record.getKey(), target);
+ namespaceSerializer.serialize(record.getNamespace(), target);
+ LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
+ K key = keySerializer.deserialize(source);
+ N namespace = namespaceSerializer.deserialize(source);
+ Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+ return new TimerHeapInternalTimer<>(timestamp, key, namespace);
+ }
+
+ @Override
+ public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ keySerializer.copy(source, target);
+ namespaceSerializer.copy(source, target);
+ LongSerializer.INSTANCE.copy(source, target);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+ keySerializer.equals(((LegacyTimerSerializer) obj).keySerializer) &&
+ namespaceSerializer.equals(((LegacyTimerSerializer) obj).namespaceSerializer));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ }
+
+ @Override
+ public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index ca9cb0b..a6bee4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -209,7 +209,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
keyGroupRange,
keyContext,
keyedStatedBackend,
- processingTimeService);
+ processingTimeService,
+ keyedStatedBackend.requiresLegacySynchronousTimerSnapshots());
// and then initialize the timer services
for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
index b9ef88e..a6194ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@@ -19,19 +19,11 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
import javax.annotation.Nonnull;
-import java.io.IOException;
-
/**
* Implementation of {@link InternalTimer} to use with a {@link HeapPriorityQueueSet}.
*
@@ -133,119 +125,8 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>,
'}';
}
- /**
- * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
- */
- public static class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
-
- private static final long serialVersionUID = 1119562170939152304L;
-
- @Nonnull
- private final TypeSerializer<K> keySerializer;
-
- @Nonnull
- private final TypeSerializer<N> namespaceSerializer;
-
- TimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
- this.keySerializer = keySerializer;
- this.namespaceSerializer = namespaceSerializer;
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
-
- final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
- final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
-
- if (keySerializerDuplicate == keySerializer &&
- namespaceSerializerDuplicate == namespaceSerializer) {
- // all delegate serializers seem stateless, so this is also stateless.
- return this;
- } else {
- // at least one delegate serializer seems to be stateful, so we return a new instance.
- return new TimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
- }
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> createInstance() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
- return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- // we do not have fixed length
- return -1;
- }
-
- @Override
- public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
- keySerializer.serialize(record.getKey(), target);
- namespaceSerializer.serialize(record.getNamespace(), target);
- LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
- K key = keySerializer.deserialize(source);
- N namespace = namespaceSerializer.deserialize(source);
- Long timestamp = LongSerializer.INSTANCE.deserialize(source);
- return new TimerHeapInternalTimer<>(timestamp, key, namespace);
- }
-
- @Override
- public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- keySerializer.copy(source, target);
- namespaceSerializer.copy(source, target);
- LongSerializer.INSTANCE.copy(source, target);
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj == this ||
- (obj != null && obj.getClass() == getClass() &&
- keySerializer.equals(((TimerSerializer) obj).keySerializer) &&
- namespaceSerializer.equals(((TimerSerializer) obj).namespaceSerializer));
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return true;
- }
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
-
- @Override
- public TypeSerializerConfigSnapshot snapshotConfiguration() {
- throw new UnsupportedOperationException("This serializer is not registered for managed state.");
- }
-
- @Override
- public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- throw new UnsupportedOperationException("This serializer is not registered for managed state.");
- }
+ @Override
+ public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
+ return Long.compare(timestamp, other.getTimestamp());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index 87a3159..73f42ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
@@ -31,8 +32,8 @@ import java.io.IOException;
import java.util.Objects;
/**
- * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is aligned with
- * {@link InternalTimer#getTimerComparator()}.
+ * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is
+ * lexicographically aligned the priority of the timers.
*
* @param <K> type of the timer key.
* @param <N> type of the timer namespace.
@@ -201,13 +202,14 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
- throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+ return new TimerSerializerConfigSnapshot<>(keySerializer, namespaceSerializer);
}
@Override
public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(
TypeSerializerConfigSnapshot configSnapshot) {
- throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+ //TODO this is just a mock (assuming no serializer updates) for now and needs a proper implementation! change this before release.
+ return CompatibilityResult.compatible();
}
@Nonnull
@@ -219,4 +221,25 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
}
+
+ /**
+ * Snaphot of a {@link TimerSerializer}.
+ *
+ * @param <K> type of key.
+ * @param <N> type of namespace.
+ */
+ public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot {
+
+ public TimerSerializerConfigSnapshot() {
+ }
+
+ public TimerSerializerConfigSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) {
+ super(keySerializer, namespaceSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 519f10e..957a535 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -910,8 +910,6 @@ public class HeapInternalTimerServiceTest {
PriorityQueueSetFactory priorityQueueSetFactory) {
return priorityQueueSetFactory.create(
name,
- timerSerializer,
- InternalTimer.getTimerComparator(),
- InternalTimer.getKeyExtractorFunction());
+ timerSerializer);
}
}