You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gyfora <gi...@git.apache.org> on 2017/05/02 08:32:42 UTC

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114270508
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    I guess this will also create the sst file(s) for the current changes (that are in the log). I wonder if this will lead to too many sst files for small snapshot intervals after a while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---