You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shixiaogang <gi...@git.apache.org> on 2017/04/29 15:53:12 UTC

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

GitHub user shixiaogang opened a pull request:

    https://github.com/apache/flink/pull/3801

    [FLINK-6364] [checkpoints] Implement incremental checkpointing in RocksDBKeyedStateBackend

    This is the initial implementation of incremental checkpointing in RocksDBKeyedStateBackend. Changes include 
    1. Add a new `CheckpointType` for incremental checkpoints.
    2. Call the `restore()` method for all `KeyedStateBackend` if the restore state handle is null or empty.
    3. Implement `RocksDBIncrementalSnapshotOperation` and `RocksDBIncrementalRestoreOperation` which supports incremental snapshotting and restoring with/without parallelism changes, respectively.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shixiaogang/flink flink-6364

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3801.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3801
    
----
commit c5340a2186ecfbc48c46048db5adb11af83db511
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Date:   2017-04-29T15:44:36Z

    Implement incremental checkpointing in RocksDBKeyedStateBackend

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114289498
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
    +
    +	private static final long serialVersionUID = -8328808513197388231L;
    +
    +	private final JobID jobId;
    +
    +	private final String operatorIdentifier;
    +
    +	private final KeyGroupRange keyGroupRange;
    +
    +	private final Set<String> newSstFileNames;
    +
    +	private final Map<String, StreamStateHandle> sstFiles;
    +
    +	private final Map<String, StreamStateHandle> miscFiles;
    +
    +	private final StreamStateHandle metaStateHandle;
    +
    +	private boolean registered;
    +
    +	RocksDBKeyedStateHandle(
    +			JobID jobId,
    +			String operatorIdentifier,
    +			KeyGroupRange keyGroupRange,
    +			Set<String> newSstFileNames,
    +			Map<String, StreamStateHandle> sstFiles,
    +			Map<String, StreamStateHandle> miscFiles,
    +			StreamStateHandle metaStateHandle) {
    +
    +		this.jobId = jobId;
    +		this.operatorIdentifier = operatorIdentifier;
    +		this.keyGroupRange = keyGroupRange;
    +		this.newSstFileNames = newSstFileNames;
    +		this.sstFiles = sstFiles;
    +		this.miscFiles = miscFiles;
    +		this.metaStateHandle = metaStateHandle;
    +		this.registered = false;
    +	}
    +
    +	@Override
    +	public KeyGroupRange getKeyGroupRange() {
    +		return keyGroupRange;
    +	}
    +
    +	public Map<String, StreamStateHandle> getSstFiles() {
    +		return sstFiles;
    +	}
    +
    +	public Map<String, StreamStateHandle> getMiscFiles() {
    +		return miscFiles;
    +	}
    +
    +	public StreamStateHandle getMetaStateHandle() {
    +		return metaStateHandle;
    +	}
    +
    +	@Override
    +	public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
    +		if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
    +			return this;
    +		} else {
    +			return null;
    --- End diff --
    
    I wonder if it would make sense to also return something like `KeyedStateHandle.EMPTY` here that has a `KeyGroupRange.EMPTY_KEY_GROUP_RANGE` to avoid the use of `null` and the corresponding checks in other code parts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114283114
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    @gyfora I think your initial concerns are valid. We need to flush the memtable to disk prior to a checkpoint and this will generate a SST. What is required for fast checkpoint intervals is an alternative mechanism to quickly determine a delta from the previous incremental checkpoint. This could be a changelog buffer in Flink that we maintain up to a certain size. The content becomes part of the private state in the incremental snapshot and it is dropped i) after each checkpoint or ii) after exceeding a certain size which to justify writing a new SST.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114359825
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    +		}
    +
    +		KeyedStateHandle materializeSnapshot() throws Exception {
    +			// write meta data
    +			metaStateHandle = materializeMetaData();
    +
    +			// write state data
    +			Preconditions.checkState(backupFileSystem.exists(backupPath));
    +
    +			FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
    +			if (fileStatuses != null ) {
    +				for (FileStatus fileStatus : fileStatuses) {
    +					Path filePath = fileStatus.getPath();
    +					String fileName = filePath.getName();
    +
    +					if (fileName.endsWith(SST_FILE_SUFFIX)) {
    +						StreamStateHandle fileHandle =
    +							baseSstFiles == null ? null : baseSstFiles.get(fileName);
    +
    +						if (fileHandle == null) {
    +							newSstFileNames.add(fileName);
    +							fileHandle = materializeStateData(filePath);
    +						}
    +
    +						sstFiles.put(fileName, fileHandle);
    +					} else {
    +						StreamStateHandle fileHandle = materializeStateData(filePath);
    +						miscFiles.put(fileName, fileHandle);
    +					}
    +				}
    +			}
    +
    +			stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
    +
    +			return new RocksDBKeyedStateHandle(stateBackend.jobId,
    +				stateBackend.operatorIdentifier, stateBackend.keyGroupRange,
    +				newSstFileNames, sstFiles, miscFiles, metaStateHandle);
    +		}
    +
    +		void releaseResources(boolean canceled) {
    +
    +			if (inputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +				try {
    +					inputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the input stream.", e);
    +				}
    +				inputStream = null;
    +			}
    +
    +			if (outputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +				try {
    +					outputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the output stream.", e);
    +				}
    +				outputStream = null;
    +			}
    +
    +			if (backupPath != null) {
    +				try {
    +					if (backupFileSystem.exists(backupPath)) {
    +						backupFileSystem.delete(backupPath, true);
    +					}
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly delete the checkpoint directory.", e);
    +				}
    +			}
    +
    +			if (canceled) {
    +				List<StateObject> statesToDiscard = new ArrayList<>();
    +
    +				if (metaStateHandle != null) {
    +					statesToDiscard.add(metaStateHandle);
    +				}
    +
    +				statesToDiscard.addAll(miscFiles.values());
    +
    +				for (String newSstFileName : newSstFileNames) {
    +					StreamStateHandle fileHandle = sstFiles.get(newSstFileName);
    +					if (fileHandle != null) {
    +						statesToDiscard.add(fileHandle);
    +					}
    +				}
    +
    +				try {
    +					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
    --- End diff --
    
    This looks like we should also have some safety net mechanism or global cleanup hook in case an exception happens or the node goes down between cancelation and triggering of the discard. In this case, all knowledge about the created files is lost. I wonder if the shared state registry should perform cleanups at startup (maybe later also after incremental recoveries) for unreferenced files in the shared directory. However, this sounds like it could be an expensive operation on some file systems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114284380
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    --- End diff --
    
    I think somehow adding the word `incremental` to the class name would help to differentiate this from other state handles created by RocksDB in full backups, because that is the distinguishing feature of this handle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114565968
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
    +
    +	private static final long serialVersionUID = -8328808513197388231L;
    +
    +	private final JobID jobId;
    +
    +	private final String operatorIdentifier;
    +
    +	private final KeyGroupRange keyGroupRange;
    +
    +	private final Set<String> newSstFileNames;
    +
    +	private final Map<String, StreamStateHandle> sstFiles;
    +
    +	private final Map<String, StreamStateHandle> miscFiles;
    +
    +	private final StreamStateHandle metaStateHandle;
    +
    +	private boolean registered;
    +
    +	RocksDBKeyedStateHandle(
    +			JobID jobId,
    +			String operatorIdentifier,
    +			KeyGroupRange keyGroupRange,
    +			Set<String> newSstFileNames,
    +			Map<String, StreamStateHandle> sstFiles,
    +			Map<String, StreamStateHandle> miscFiles,
    +			StreamStateHandle metaStateHandle) {
    +
    +		this.jobId = jobId;
    +		this.operatorIdentifier = operatorIdentifier;
    +		this.keyGroupRange = keyGroupRange;
    +		this.newSstFileNames = newSstFileNames;
    +		this.sstFiles = sstFiles;
    +		this.miscFiles = miscFiles;
    +		this.metaStateHandle = metaStateHandle;
    +		this.registered = false;
    +	}
    +
    +	@Override
    +	public KeyGroupRange getKeyGroupRange() {
    +		return keyGroupRange;
    +	}
    +
    +	public Map<String, StreamStateHandle> getSstFiles() {
    +		return sstFiles;
    +	}
    +
    +	public Map<String, StreamStateHandle> getMiscFiles() {
    +		return miscFiles;
    +	}
    +
    +	public StreamStateHandle getMetaStateHandle() {
    +		return metaStateHandle;
    +	}
    +
    +	@Override
    +	public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
    +		if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
    +			return this;
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	@Override
    +	public void discardState() throws Exception {
    +
    +		try {
    +			metaStateHandle.discardState();
    +		} catch (Exception e) {
    +			LOG.warn("Could not properly discard meta data.", e);
    +		}
    +
    +		try {
    +			StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
    +		} catch (Exception e) {
    +			LOG.warn("Could not properly discard misc file state.", e);
    +		}
    +
    +		if (!registered) {
    +			for (String newSstFileName : newSstFileNames) {
    +				StreamStateHandle handle = sstFiles.get(newSstFileName);
    +				try {
    +					handle.discardState();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly discard sst file state", e);
    +				}
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public long getStateSize() {
    +		long size = StateUtil.getStateSize(metaStateHandle);
    +
    +		for (StreamStateHandle sstFileHandle : sstFiles.values()) {
    +			size += sstFileHandle.getStateSize();
    +		}
    +
    +		for (StreamStateHandle miscFileHandle : miscFiles.values()) {
    +			size += miscFileHandle.getStateSize();
    +		}
    +
    +		return size;
    +	}
    +
    +	@Override
    +	public void registerSharedStates(SharedStateRegistry stateRegistry) {
    +		Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
    +
    +		for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +			SstFileStateHandle stateHandle = new SstFileStateHandle(sstFileEntry.getKey(), sstFileEntry.getValue());
    +
    +			int referenceCount = stateRegistry.register(stateHandle);
    +
    +			if (newSstFileNames.contains(sstFileEntry.getKey())) {
    +				Preconditions.checkState(referenceCount == 1);
    +			} else {
    +				Preconditions.checkState(referenceCount > 1);
    +			}
    +		}
    +
    +		registered = true;
    +	}
    +
    +	@Override
    +	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
    +		Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
    +
    +		for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +			stateRegistry.unregister(new SstFileStateHandle(sstFileEntry.getKey(), sstFileEntry.getValue()));
    +		}
    +
    +		registered = false;
    +	}
    +
    +	private class SstFileStateHandle implements SharedStateHandle {
    --- End diff --
    
    The `StreamStateHandle` returned by the `CheckpointStateOutputStream` may also be typed`ByteArrayStreamHandle`. So we cannot simply use `SharedFileStateHandle` here. I think we do not change the code here and do the refactoring in the future. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114703946
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException {
     		}
     	}
     
    +	private static class RocksDBIncrementalRestoreOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
    +			this.stateBackend = stateBackend;
    +		}
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
    +				StreamStateHandle metaStateHandle) throws Exception {
    +
    +			FSDataInputStream inputStream = null;
    +
    +			try {
    +				inputStream = metaStateHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +				DataInputView in = new DataInputViewStreamWrapper(inputStream);
    +				serializationProxy.read(in);
    +
    +				return serializationProxy.getNamedStateSerializationProxies();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void readStateData(
    +				Path restoreFilePath,
    +				StreamStateHandle remoteFileHandle) throws IOException {
    +
    +			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
    +
    +			FSDataInputStream inputStream = null;
    +			FSDataOutputStream outputStream = null;
    +
    +			try {
    +				inputStream = remoteFileHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				byte[] buffer = new byte[1024];
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void restoreInstance(
    +				RocksDBKeyedStateHandle restoreStateHandle,
    +				boolean hasExtraKeys) throws Exception {
    +
    +			// read state data
    +			Path restoreInstancePath = new Path(
    +				stateBackend.instanceBasePath.getAbsolutePath(),
    +				UUID.randomUUID().toString());
    +
    +			try {
    +				Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles();
    +				for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +					String fileName = sstFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = sstFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
    +				for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) {
    +					String fileName = miscFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				// read meta data
    +				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
    +					readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    +				columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) {
    +
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +						stateBackend.columnOptions);
    +
    +					columnFamilyDescriptors.add(columnFamilyDescriptor);
    +				}
    +
    +				if (hasExtraKeys) {
    --- End diff --
    
    The optimization is targeted for the deletion in the same instance. In our cases, we are moving key-value pairs from one instance to another, so the optimizations for Delete-A-Range-Of-Keys may not work here. Multi-put may do help to improve the performance, but little improvement is observed in the experiments. 
    
    Maybe we can leave it as it is and do the optimization in the future. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114320148
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
    +
    +	private static final long serialVersionUID = -8328808513197388231L;
    +
    +	private final JobID jobId;
    +
    +	private final String operatorIdentifier;
    +
    +	private final KeyGroupRange keyGroupRange;
    +
    +	private final Set<String> newSstFileNames;
    --- End diff --
    
    Overall, I wonder how this class impacts larger deployments, where a lot of those state handles are send via RPC. I suggest to keep the serialization footprint as small as possible. For example, maybe we can eliminate `newSstFileNames` by introducing two `Map<String, StreamStateHandle>`: `newSstFiles` and `previousSstFiles`, instead of checking against a set. This could even simplify some methods, e.g. the registration or deletion code. We could then provide a method that delivers an combined iterator over all `Entry<String, StreamStateHandle>`from both maps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    @StefanRRichter Do you think we should try to figure out which SST files have been compacted and exclude them from recovery?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114504710
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    That is a valid concern @shixiaogang, so there are two points to that:
    1) We have a clear upper limit to the buffer size (e.g. 64MB), once the limit of diffs is reached, we can drop the buffer because we can assume enough work was done to justify a new SST file
    
    2) We write the buffer to a local FS, so we can expect this to be reasonable fast and that it will not suffer from the kind of blocking that we have in DFS. I mean technically, also flushing the SST file can block. Then, in the async part, we can transfer the local buffer file to DFS.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang closed the pull request at:

    https://github.com/apache/flink/pull/3801


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    @StefanRRichter  Thanks a lot for your review. I have updated the pull request as suggested. The following changes are made
    
    1. Remove the checkpoint type for incremental checkpoints. Now the support for incremental checkpointing becomes a configurable feature in `RocksDBKeyedStateBackend`, just like asynchronous checkpointing in `HeapKeyedStateBackend`. Incremental checkpointing will be performed if the feature is enabled and the checkpoint to perform is not a savepoint.
    
    2. Rename `RocksDBKeyedStateHandle` to `RocksDBIncrementalKeyedStateHandle` and do some refactoring.
    
    3. Allow `KeyedStateHandle` to register shared states.
    
    4. Maintain the information of last completed checkpoint with the notification of `AbstractStreamOperator`.
    
    5. Parameterize `RocksDBStateBackendTest` to test the cleanup of resources in both full and incremental checkpointing.
    
    6. Parameterize `PartitionedStateCheckpointingITCase` to test the snapshotting and restoring with different backend settings.
    
    It's appreciated if you can take a look at these changes. Any comment is welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114565991
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---
    @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
     		cancelables.registerClosable(keyedStateBackend);
     
     		// restore if we have some old state
    -		if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) {
    -			keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
    -		}
    +		Collection<KeyedStateHandle> restoreKeyedStateHandles =
    +			restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState();
    +
    +		keyedStateBackend.restore(restoreKeyedStateHandles);
    --- End diff --
    
    I attempted to put the restore state in the constructor as we discussed. But it turns out impossible. 
    
    All state backends should be registered in the task so that the backends can be closed when the task is canceled.  If we put the restoring in the constructor of the backends, the construction of the backends may be blocked (e.g., due to the access to HDFS). Since the construction is not completed yet, the backend will not be registered and hence will not be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114580123
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---
    @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
     		cancelables.registerClosable(keyedStateBackend);
     
     		// restore if we have some old state
    -		if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) {
    -			keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
    -		}
    +		Collection<KeyedStateHandle> restoreKeyedStateHandles =
    +			restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState();
    +
    +		keyedStateBackend.restore(restoreKeyedStateHandles);
    --- End diff --
    
    Ok, then we can also look at this again later. For now it is ok as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114363332
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException {
     		}
     	}
     
    +	private static class RocksDBIncrementalRestoreOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
    +			this.stateBackend = stateBackend;
    +		}
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
    +				StreamStateHandle metaStateHandle) throws Exception {
    +
    +			FSDataInputStream inputStream = null;
    +
    +			try {
    +				inputStream = metaStateHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +				DataInputView in = new DataInputViewStreamWrapper(inputStream);
    +				serializationProxy.read(in);
    +
    +				return serializationProxy.getNamedStateSerializationProxies();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void readStateData(
    +				Path restoreFilePath,
    +				StreamStateHandle remoteFileHandle) throws IOException {
    +
    +			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
    +
    +			FSDataInputStream inputStream = null;
    +			FSDataOutputStream outputStream = null;
    +
    +			try {
    +				inputStream = remoteFileHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				byte[] buffer = new byte[1024];
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void restoreInstance(
    +				RocksDBKeyedStateHandle restoreStateHandle,
    +				boolean hasExtraKeys) throws Exception {
    +
    +			// read state data
    +			Path restoreInstancePath = new Path(
    +				stateBackend.instanceBasePath.getAbsolutePath(),
    +				UUID.randomUUID().toString());
    +
    +			try {
    +				Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles();
    +				for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +					String fileName = sstFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = sstFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
    +				for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) {
    +					String fileName = miscFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				// read meta data
    +				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
    +					readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    +				columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) {
    +
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +						stateBackend.columnOptions);
    +
    +					columnFamilyDescriptors.add(columnFamilyDescriptor);
    +				}
    +
    +				if (hasExtraKeys) {
    +
    +					List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
    +
    +					RocksDB restoreDb = RocksDB.open(
    --- End diff --
    
    I suggest to use try-with-resources on the `restoreDb`. The instance is never closed and i wonder if also the restore instance directory should be deleted as part of some safety net cleanup hook (in the future?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114270508
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    I guess this will also create the sst file(s) for the current changes (that are in the log). I wonder if this will lead to too many sst files for small snapshot intervals after a while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114287252
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java ---
    @@ -36,7 +36,8 @@
     	 */
     	@Test
     	public void testOrdinalsAreConstant() {
    -		assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
    -		assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
    +		assertEquals(0, CheckpointType.INCREMENTAL_CHECKPOINT.ordinal());
    +		assertEquals(1, CheckpointType.FULL_CHECKPOINT.ordinal());
    +		assertEquals(2, CheckpointType.SAVEPOINT.ordinal());
    --- End diff --
    
    The purpose of this test is to ensures that encoding of checkpoint types remains stable for compatibility of Flink savepoints, and this changes would break the compatibility. In this sense, it should enforce "append-only" for new chckpoint types. To fix this problem, `CheckpointType.INCREMENTAL_CHECKPOINT` should be encoded as value 2 and the other types must be changed back to their old value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114321075
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -152,6 +179,10 @@ public RocksDBKeyedStateBackend(
     	) throws IOException {
     
     		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
    +
    +		this.jobId = jobId;
    --- End diff --
    
    We could introduce preconditon checks against `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114500597
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    @StefanRRichter Thanks for your explanation. Your method to allow fast checkpoint is very smart. I think it can help us to avoid small sst files produced by the flushing.  But i think we should also be careful about the flushing of the buffer, because it might block the processing if the buffer is very big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114380645
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException {
     		}
     	}
     
    +	private static class RocksDBIncrementalRestoreOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
    +			this.stateBackend = stateBackend;
    +		}
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
    +				StreamStateHandle metaStateHandle) throws Exception {
    +
    +			FSDataInputStream inputStream = null;
    +
    +			try {
    +				inputStream = metaStateHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +				DataInputView in = new DataInputViewStreamWrapper(inputStream);
    +				serializationProxy.read(in);
    +
    +				return serializationProxy.getNamedStateSerializationProxies();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void readStateData(
    +				Path restoreFilePath,
    +				StreamStateHandle remoteFileHandle) throws IOException {
    +
    +			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
    +
    +			FSDataInputStream inputStream = null;
    +			FSDataOutputStream outputStream = null;
    +
    +			try {
    +				inputStream = remoteFileHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				byte[] buffer = new byte[1024];
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void restoreInstance(
    +				RocksDBKeyedStateHandle restoreStateHandle,
    +				boolean hasExtraKeys) throws Exception {
    +
    +			// read state data
    +			Path restoreInstancePath = new Path(
    +				stateBackend.instanceBasePath.getAbsolutePath(),
    +				UUID.randomUUID().toString());
    +
    +			try {
    +				Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles();
    +				for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +					String fileName = sstFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = sstFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
    +				for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) {
    +					String fileName = miscFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				// read meta data
    +				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
    +					readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    +				columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) {
    +
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +						stateBackend.columnOptions);
    +
    +					columnFamilyDescriptors.add(columnFamilyDescriptor);
    +				}
    +
    +				if (hasExtraKeys) {
    --- End diff --
    
    I assume you are not using the delete-range so that we can support rescaling?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    I am sorry, but before merging I noticed that some tests (e.g. `RocksDBStateBackendTest.testCancelRunningSnapshot`) fail sporadically (only on Travis). I tracked the problem and I think the cause is a lack of eagerly closing the streams in `cancel()` to interrupt blocking IO calls.
    
    I suggest the following fix:
    
    `RocksDBIncrementalSnapshotOperation` should have it’s own `CloseableRegistry`. This tracks all the open streams inside the checkpointing and is registered with the backends registry for as long as the task runs. Then, in cancel, as a first step we can close and unregister that inner `CloseableRegistry`. This also prevents races that the current stream gets closed asynchronously by `cancel()`, which the checkpointing actually already opened the next stream (the registry closes and blocks new streams on registration once it is closed)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114318544
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
    +
    +	private static final long serialVersionUID = -8328808513197388231L;
    +
    +	private final JobID jobId;
    +
    +	private final String operatorIdentifier;
    +
    +	private final KeyGroupRange keyGroupRange;
    +
    +	private final Set<String> newSstFileNames;
    +
    +	private final Map<String, StreamStateHandle> sstFiles;
    +
    +	private final Map<String, StreamStateHandle> miscFiles;
    +
    +	private final StreamStateHandle metaStateHandle;
    +
    +	private boolean registered;
    --- End diff --
    
    I would suggest a small comment on the meaning of this, because it is actually quiet important that this tracks which files can be deleted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114318333
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    --- End diff --
    
    I think it would be helpful to describe a bit what this is composed of, for example what are the misc files?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114282104
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    Sorry I was probably wrong here. I guess recent changes are in the CURRENT and MANIFEST files and those are always checkpointed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114354639
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    --- End diff --
    
    Assuming that checkpoint ids always increase, this should actually never happen. If it still happens, I wonder if it is good to simply delete the directory. My suggestion is to a) append a random uid to the path and b) abort the checkpoint instead of deleting if we encounter an existing folder. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114318030
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
    +
    +	private static final long serialVersionUID = -8328808513197388231L;
    +
    +	private final JobID jobId;
    +
    +	private final String operatorIdentifier;
    +
    +	private final KeyGroupRange keyGroupRange;
    +
    +	private final Set<String> newSstFileNames;
    +
    +	private final Map<String, StreamStateHandle> sstFiles;
    +
    +	private final Map<String, StreamStateHandle> miscFiles;
    +
    +	private final StreamStateHandle metaStateHandle;
    +
    +	private boolean registered;
    +
    +	RocksDBKeyedStateHandle(
    +			JobID jobId,
    +			String operatorIdentifier,
    +			KeyGroupRange keyGroupRange,
    +			Set<String> newSstFileNames,
    +			Map<String, StreamStateHandle> sstFiles,
    +			Map<String, StreamStateHandle> miscFiles,
    +			StreamStateHandle metaStateHandle) {
    +
    +		this.jobId = jobId;
    --- End diff --
    
    We could check for some notNull preconditions in the ctor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114361888
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    --- End diff --
    
    We could set `outputStream` to `null` before returning, to avoid a second call to `close()` in the `finally` clause


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114579898
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
    +
    +	private static final long serialVersionUID = -8328808513197388231L;
    +
    +	private final JobID jobId;
    +
    +	private final String operatorIdentifier;
    +
    +	private final KeyGroupRange keyGroupRange;
    +
    +	private final Set<String> newSstFileNames;
    +
    +	private final Map<String, StreamStateHandle> sstFiles;
    +
    +	private final Map<String, StreamStateHandle> miscFiles;
    +
    +	private final StreamStateHandle metaStateHandle;
    +
    +	private boolean registered;
    +
    +	RocksDBKeyedStateHandle(
    +			JobID jobId,
    +			String operatorIdentifier,
    +			KeyGroupRange keyGroupRange,
    +			Set<String> newSstFileNames,
    +			Map<String, StreamStateHandle> sstFiles,
    +			Map<String, StreamStateHandle> miscFiles,
    +			StreamStateHandle metaStateHandle) {
    +
    +		this.jobId = jobId;
    +		this.operatorIdentifier = operatorIdentifier;
    +		this.keyGroupRange = keyGroupRange;
    +		this.newSstFileNames = newSstFileNames;
    +		this.sstFiles = sstFiles;
    +		this.miscFiles = miscFiles;
    +		this.metaStateHandle = metaStateHandle;
    +		this.registered = false;
    +	}
    +
    +	@Override
    +	public KeyGroupRange getKeyGroupRange() {
    +		return keyGroupRange;
    +	}
    +
    +	public Map<String, StreamStateHandle> getSstFiles() {
    +		return sstFiles;
    +	}
    +
    +	public Map<String, StreamStateHandle> getMiscFiles() {
    +		return miscFiles;
    +	}
    +
    +	public StreamStateHandle getMetaStateHandle() {
    +		return metaStateHandle;
    +	}
    +
    +	@Override
    +	public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
    +		if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
    +			return this;
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	@Override
    +	public void discardState() throws Exception {
    +
    +		try {
    +			metaStateHandle.discardState();
    +		} catch (Exception e) {
    +			LOG.warn("Could not properly discard meta data.", e);
    +		}
    +
    +		try {
    +			StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
    +		} catch (Exception e) {
    +			LOG.warn("Could not properly discard misc file state.", e);
    +		}
    +
    +		if (!registered) {
    +			for (String newSstFileName : newSstFileNames) {
    +				StreamStateHandle handle = sstFiles.get(newSstFileName);
    +				try {
    +					handle.discardState();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly discard sst file state", e);
    +				}
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public long getStateSize() {
    +		long size = StateUtil.getStateSize(metaStateHandle);
    +
    +		for (StreamStateHandle sstFileHandle : sstFiles.values()) {
    +			size += sstFileHandle.getStateSize();
    +		}
    +
    +		for (StreamStateHandle miscFileHandle : miscFiles.values()) {
    +			size += miscFileHandle.getStateSize();
    +		}
    +
    +		return size;
    +	}
    +
    +	@Override
    +	public void registerSharedStates(SharedStateRegistry stateRegistry) {
    +		Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
    +
    +		for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +			SstFileStateHandle stateHandle = new SstFileStateHandle(sstFileEntry.getKey(), sstFileEntry.getValue());
    +
    +			int referenceCount = stateRegistry.register(stateHandle);
    +
    +			if (newSstFileNames.contains(sstFileEntry.getKey())) {
    +				Preconditions.checkState(referenceCount == 1);
    +			} else {
    +				Preconditions.checkState(referenceCount > 1);
    +			}
    +		}
    +
    +		registered = true;
    +	}
    +
    +	@Override
    +	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
    +		Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
    +
    +		for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +			stateRegistry.unregister(new SstFileStateHandle(sstFileEntry.getKey(), sstFileEntry.getValue()));
    +		}
    +
    +		registered = false;
    +	}
    +
    +	private class SstFileStateHandle implements SharedStateHandle {
    --- End diff --
    
    Yes, we can do that later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    Hi
    Thanks for the nice effort!
    
    I only skimmed through the changes to get the main idea (I will do a more thorough review in the next days) but I have some initial questions :)
    
    1. You mentioned and it looks like that incremental snapshots cannot be rescaled as it checkpoints the files directly for multiple keygroups together. How is this constraint enforced? Will the job fail if we try to rescale it?
    
    2. When you restore from a full snapshot will the next incremental snapshot contain all the files or just the diffs?
    
    3. How do you plan for users to trigger the incremental snapshot operation? I guess the question goes both for checkpoints and savepoints. For example every n-th checkpoint is a full snapshot. And users could use flag maybe to indicate whether the savepoint should be incremental?
    
    Cheers,
    Gyula


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    sweet


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    I merged your changes. Please close this PR and the JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114286771
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---
    @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
     		cancelables.registerClosable(keyedStateBackend);
     
     		// restore if we have some old state
    -		if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) {
    -			keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
    -		}
    +		Collection<KeyedStateHandle> restoreKeyedStateHandles =
    +			restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState();
    +
    +		keyedStateBackend.restore(restoreKeyedStateHandles);
    --- End diff --
    
    I think this would be a good opportunity to switch from lazy restore to eagerly passing state handles to the factory method for the keyed state backend implementations. The factory, in turn, can eagerly pass the restore state to the constructor.
    I think this could be done in a followup PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114368960
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -137,6 +156,14 @@
     	/** Number of bytes required to prefix the key groups. */
     	private final int keyGroupPrefixBytes;
     
    +	/** The sst files materialized in pending checkpoints */
    +	private final SortedMap<Long, Map<String, StreamStateHandle>> materializedSstFiles = new TreeMap<>();
    +
    +	/** The identifier of the last completed checkpoint */
    +	private final long lastCompletedCheckpointId = -1;
    --- End diff --
    
    Currently, this value `lastCompletedCheckpointId` is not maintained at all, and also the `materializedSstFiles` is ever-growing. I think the whole feedback from the checkpoint coordinator about completed checkpoints is still missing. Are you planning to do this in another PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114365154
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException {
     		}
     	}
     
    +	private static class RocksDBIncrementalRestoreOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
    +			this.stateBackend = stateBackend;
    +		}
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
    +				StreamStateHandle metaStateHandle) throws Exception {
    +
    +			FSDataInputStream inputStream = null;
    +
    +			try {
    +				inputStream = metaStateHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +				DataInputView in = new DataInputViewStreamWrapper(inputStream);
    +				serializationProxy.read(in);
    +
    +				return serializationProxy.getNamedStateSerializationProxies();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void readStateData(
    +				Path restoreFilePath,
    +				StreamStateHandle remoteFileHandle) throws IOException {
    +
    +			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
    +
    +			FSDataInputStream inputStream = null;
    +			FSDataOutputStream outputStream = null;
    +
    +			try {
    +				inputStream = remoteFileHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				byte[] buffer = new byte[1024];
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void restoreInstance(
    +				RocksDBKeyedStateHandle restoreStateHandle,
    +				boolean hasExtraKeys) throws Exception {
    +
    +			// read state data
    +			Path restoreInstancePath = new Path(
    +				stateBackend.instanceBasePath.getAbsolutePath(),
    +				UUID.randomUUID().toString());
    +
    +			try {
    +				Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles();
    +				for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +					String fileName = sstFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = sstFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
    +				for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) {
    +					String fileName = miscFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				// read meta data
    +				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
    +					readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    +				columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) {
    +
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +						stateBackend.columnOptions);
    +
    +					columnFamilyDescriptors.add(columnFamilyDescriptor);
    +				}
    +
    +				if (hasExtraKeys) {
    +
    +					List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
    +
    +					RocksDB restoreDb = RocksDB.open(
    +						stateBackend.dbOptions,
    +						restoreInstancePath.getPath(),
    +						columnFamilyDescriptors,
    +						columnFamilyHandles);
    +
    +					for (int i = 1; i < columnFamilyHandles.size(); ++i) {
    +						ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
    +						ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
    +						KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = stateMetaInfoProxies.get(i - 1);
    +
    +						Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
    +							stateBackend.kvStateInformation.get(stateMetaInfoProxy.getStateName());
    +
    +						if (null == registeredStateMetaInfoEntry) {
    +
    +							RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo = new RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy);
    +
    +							registeredStateMetaInfoEntry =
    +								new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
    +									stateBackend.db.createColumnFamily(columnFamilyDescriptor),
    +									stateMetaInfo);
    +
    +							stateBackend.kvStateInformation.put(stateMetaInfoProxy.getStateName(), registeredStateMetaInfoEntry);
    +						}
    +
    +						ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
    +
    +						try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
    +
    +							iterator.seekToFirst();
    --- End diff --
    
    Instead of `seekToFirst`, can we not seek to the first key-group in the backend's range (via `seek(keygroupPrefixBytes)` to potentially save some entries?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114325070
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -265,9 +281,64 @@ private boolean hasRegisteredState() {
     			final CheckpointStreamFactory streamFactory,
     			CheckpointOptions checkpointOptions) throws Exception {
     
    +		if (checkpointOptions.getCheckpointType() == CheckpointOptions.CheckpointType.INCREMENTAL_CHECKPOINT) {
    +			return snapshotIncrementally(checkpointId, timestamp, streamFactory);
    +		} else {
    +			return snapshotFully(checkpointId, timestamp, streamFactory);
    +		}
    +	}
    +
    +	private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
    +			final long checkpointId,
    --- End diff --
    
    This method has some amount of duplicated code with `snapshotFully`. My suggestion would be target for some common abstract strategy common to all checkpointing, e.g. `takeSnapshot`, `materialize`, and `releaseResources`. Then we could have a factory that, depending on the checkpoint type, instantiates the right implementation. This would also help with a second concern: I can see that the code in this class has grown a lot over time, so this could be the time to move some aspects like checkpointing strategies into separated classes (they are already static inner classes anyways). By splitting a bit between processing logic and snapshot/restore logic, we can keep things more modular and separation of concerns.
    
    We can also do this cleanup in a followup PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    Hi @gyfora I am very happy to hear from you. The following are the answers to your questions. Kindly let me know if you have any idea of them.
    
    1. The incremental checkpoints supports rescaling. It's true that the implementation checkpoints files directly for multiple key groups together. But in the cases where the degree of parallelism changes, the files will be passed to all the state backends whose key groups are in the files. Then the backends will iterate over all the key-value pairs in the files and pick up those kv pairs that belong to them.
    
    2.  In the cases we restore from a full snapshot (which is formatted as key-value pairs), the next incremental checkpoint will contain all the files. It may seem a little bit inefficient because i intend to make each checkpoint self-contained. Given that full snapshots and incremental snapshots are in different formats, we have to take a "full" incremental snapshot as the base for following checkpoints.
    
    3. That is a very good question. It will be flexible that users can choose the scheme of checkpoints (say one full checkpoint after n incremental checkpoints).  But i think making every checkpoint incremental is acceptable because incremental checkpoints are more  efficient in most cases. Those backends which do not support incremental checkpointing can still take full snapshotting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114703775
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    +		}
    +
    +		KeyedStateHandle materializeSnapshot() throws Exception {
    +			// write meta data
    +			metaStateHandle = materializeMetaData();
    +
    +			// write state data
    +			Preconditions.checkState(backupFileSystem.exists(backupPath));
    +
    +			FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
    +			if (fileStatuses != null ) {
    +				for (FileStatus fileStatus : fileStatuses) {
    +					Path filePath = fileStatus.getPath();
    +					String fileName = filePath.getName();
    +
    +					if (fileName.endsWith(SST_FILE_SUFFIX)) {
    +						StreamStateHandle fileHandle =
    +							baseSstFiles == null ? null : baseSstFiles.get(fileName);
    +
    +						if (fileHandle == null) {
    +							newSstFileNames.add(fileName);
    +							fileHandle = materializeStateData(filePath);
    +						}
    +
    +						sstFiles.put(fileName, fileHandle);
    +					} else {
    +						StreamStateHandle fileHandle = materializeStateData(filePath);
    +						miscFiles.put(fileName, fileHandle);
    +					}
    +				}
    +			}
    +
    +			stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
    +
    +			return new RocksDBKeyedStateHandle(stateBackend.jobId,
    +				stateBackend.operatorIdentifier, stateBackend.keyGroupRange,
    +				newSstFileNames, sstFiles, miscFiles, metaStateHandle);
    +		}
    +
    +		void releaseResources(boolean canceled) {
    +
    +			if (inputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +				try {
    +					inputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the input stream.", e);
    +				}
    +				inputStream = null;
    +			}
    +
    +			if (outputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +				try {
    +					outputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the output stream.", e);
    +				}
    +				outputStream = null;
    +			}
    +
    +			if (backupPath != null) {
    +				try {
    +					if (backupFileSystem.exists(backupPath)) {
    +						backupFileSystem.delete(backupPath, true);
    +					}
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly delete the checkpoint directory.", e);
    +				}
    +			}
    +
    +			if (canceled) {
    +				List<StateObject> statesToDiscard = new ArrayList<>();
    +
    +				if (metaStateHandle != null) {
    +					statesToDiscard.add(metaStateHandle);
    +				}
    +
    +				statesToDiscard.addAll(miscFiles.values());
    +
    +				for (String newSstFileName : newSstFileNames) {
    +					StreamStateHandle fileHandle = sstFiles.get(newSstFileName);
    +					if (fileHandle != null) {
    +						statesToDiscard.add(fileHandle);
    +					}
    +				}
    +
    +				try {
    +					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
    --- End diff --
    
    You are absolutely right. Global cleanup hooks are urgently needed here to clean unused states here. The hook at job manager instead of the shared state registry is supposed to do the work because unused private states should be cleaned as well.   At the startup, the hook will know the checkpoint from which we are restoring and only retain the data in restored completed checkpoints.
    
    The local checkpointing directories will be deleted once the checkpoint completed at the TM. Since local checkpoint directories are all under the directory for the backend which is deleted when the backend is disposed, they can also be deleted if the backend is correctly closed. 
    
    But in the cases when the TM fails during the closing of the backend, the local checkpoint directories will be left on the file system. The problem does not matter in Yarn clusters but may be very severe in standalone clusters. What do you think of the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114288802
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}
    + */
    +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
    +
    +	private static final long serialVersionUID = -8328808513197388231L;
    +
    +	private final JobID jobId;
    +
    +	private final String operatorIdentifier;
    +
    +	private final KeyGroupRange keyGroupRange;
    +
    +	private final Set<String> newSstFileNames;
    +
    +	private final Map<String, StreamStateHandle> sstFiles;
    +
    +	private final Map<String, StreamStateHandle> miscFiles;
    +
    +	private final StreamStateHandle metaStateHandle;
    +
    +	private boolean registered;
    +
    +	RocksDBKeyedStateHandle(
    +			JobID jobId,
    +			String operatorIdentifier,
    +			KeyGroupRange keyGroupRange,
    +			Set<String> newSstFileNames,
    +			Map<String, StreamStateHandle> sstFiles,
    +			Map<String, StreamStateHandle> miscFiles,
    +			StreamStateHandle metaStateHandle) {
    +
    +		this.jobId = jobId;
    +		this.operatorIdentifier = operatorIdentifier;
    +		this.keyGroupRange = keyGroupRange;
    +		this.newSstFileNames = newSstFileNames;
    +		this.sstFiles = sstFiles;
    +		this.miscFiles = miscFiles;
    +		this.metaStateHandle = metaStateHandle;
    +		this.registered = false;
    +	}
    +
    +	@Override
    +	public KeyGroupRange getKeyGroupRange() {
    +		return keyGroupRange;
    +	}
    +
    +	public Map<String, StreamStateHandle> getSstFiles() {
    +		return sstFiles;
    +	}
    +
    +	public Map<String, StreamStateHandle> getMiscFiles() {
    +		return miscFiles;
    +	}
    +
    +	public StreamStateHandle getMetaStateHandle() {
    +		return metaStateHandle;
    +	}
    +
    +	@Override
    +	public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
    +		if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
    +			return this;
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	@Override
    +	public void discardState() throws Exception {
    +
    +		try {
    +			metaStateHandle.discardState();
    +		} catch (Exception e) {
    +			LOG.warn("Could not properly discard meta data.", e);
    +		}
    +
    +		try {
    +			StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
    +		} catch (Exception e) {
    +			LOG.warn("Could not properly discard misc file state.", e);
    +		}
    +
    +		if (!registered) {
    +			for (String newSstFileName : newSstFileNames) {
    +				StreamStateHandle handle = sstFiles.get(newSstFileName);
    +				try {
    +					handle.discardState();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly discard sst file state", e);
    +				}
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public long getStateSize() {
    +		long size = StateUtil.getStateSize(metaStateHandle);
    +
    +		for (StreamStateHandle sstFileHandle : sstFiles.values()) {
    +			size += sstFileHandle.getStateSize();
    +		}
    +
    +		for (StreamStateHandle miscFileHandle : miscFiles.values()) {
    +			size += miscFileHandle.getStateSize();
    +		}
    +
    +		return size;
    +	}
    +
    +	@Override
    +	public void registerSharedStates(SharedStateRegistry stateRegistry) {
    +		Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
    +
    +		for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +			SstFileStateHandle stateHandle = new SstFileStateHandle(sstFileEntry.getKey(), sstFileEntry.getValue());
    +
    +			int referenceCount = stateRegistry.register(stateHandle);
    +
    +			if (newSstFileNames.contains(sstFileEntry.getKey())) {
    +				Preconditions.checkState(referenceCount == 1);
    +			} else {
    +				Preconditions.checkState(referenceCount > 1);
    +			}
    +		}
    +
    +		registered = true;
    +	}
    +
    +	@Override
    +	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
    +		Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
    +
    +		for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +			stateRegistry.unregister(new SstFileStateHandle(sstFileEntry.getKey(), sstFileEntry.getValue()));
    +		}
    +
    +		registered = false;
    +	}
    +
    +	private class SstFileStateHandle implements SharedStateHandle {
    --- End diff --
    
    Looking a bit ahead, the concept of a state handle to shared files might be something to reuse in other future cases, e.g. imagine incremental snapshots for the heap based backend. I suggest to move this in full class named `SharedFileStateHandle`. Conceptionally and implementation-wise this could be `extends FileStateHandle implements SharedStateHandle`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    @shixiaogang This looks good for me, I will go ahead and merge this. Thanks for your work!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114364811
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException {
     		}
     	}
     
    +	private static class RocksDBIncrementalRestoreOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
    +			this.stateBackend = stateBackend;
    +		}
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
    +				StreamStateHandle metaStateHandle) throws Exception {
    +
    +			FSDataInputStream inputStream = null;
    +
    +			try {
    +				inputStream = metaStateHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +				DataInputView in = new DataInputViewStreamWrapper(inputStream);
    +				serializationProxy.read(in);
    +
    +				return serializationProxy.getNamedStateSerializationProxies();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void readStateData(
    +				Path restoreFilePath,
    +				StreamStateHandle remoteFileHandle) throws IOException {
    +
    +			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
    +
    +			FSDataInputStream inputStream = null;
    +			FSDataOutputStream outputStream = null;
    +
    +			try {
    +				inputStream = remoteFileHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				byte[] buffer = new byte[1024];
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void restoreInstance(
    +				RocksDBKeyedStateHandle restoreStateHandle,
    +				boolean hasExtraKeys) throws Exception {
    +
    +			// read state data
    +			Path restoreInstancePath = new Path(
    +				stateBackend.instanceBasePath.getAbsolutePath(),
    +				UUID.randomUUID().toString());
    +
    +			try {
    +				Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles();
    +				for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) {
    +					String fileName = sstFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = sstFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
    +				for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) {
    +					String fileName = miscFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				// read meta data
    +				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
    +					readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    +				columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) {
    +
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +						stateBackend.columnOptions);
    +
    +					columnFamilyDescriptors.add(columnFamilyDescriptor);
    +				}
    +
    +				if (hasExtraKeys) {
    --- End diff --
    
    I wonder if we could prune key-groups based on this: https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys.
    
    If not, would it make sense to bulk the inserts using the multi-put feature?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    Hi @StefanRRichter Thanks a lot for you pointing out the problem the suggestion for the fix. I have updated the PR as suggested. A `CloseableRegistry` is used to track opened i/o streams now. And the opened i/o streams are closed first in the cancellation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    @gyfora this is already happening in the code: the backend tracks which files are already transferred to hdfs, and a reference counting in the checkpoint coordinator (introduced in FLINK-6014) takes care of deleting SSTable files from the shared HDFS directory that are no longer referenced by any incremental checkpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3801
  
    About 3., i think the self-consolidating/pruning nature of RocksDB w.r.t. the SSTables makes it feasible to make all checkpoints incremental, because the delta does not grow indefinately for a constant backend size. All savepoints would be full and self-contained.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114362074
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    --- End diff --
    
    Same comment about nulling the `outputStream` applies here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114360519
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException {
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    +		}
    +
    +		KeyedStateHandle materializeSnapshot() throws Exception {
    +			// write meta data
    +			metaStateHandle = materializeMetaData();
    +
    +			// write state data
    +			Preconditions.checkState(backupFileSystem.exists(backupPath));
    +
    +			FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
    +			if (fileStatuses != null ) {
    +				for (FileStatus fileStatus : fileStatuses) {
    +					Path filePath = fileStatus.getPath();
    +					String fileName = filePath.getName();
    +
    +					if (fileName.endsWith(SST_FILE_SUFFIX)) {
    +						StreamStateHandle fileHandle =
    +							baseSstFiles == null ? null : baseSstFiles.get(fileName);
    +
    +						if (fileHandle == null) {
    +							newSstFileNames.add(fileName);
    +							fileHandle = materializeStateData(filePath);
    +						}
    +
    +						sstFiles.put(fileName, fileHandle);
    +					} else {
    +						StreamStateHandle fileHandle = materializeStateData(filePath);
    +						miscFiles.put(fileName, fileHandle);
    +					}
    +				}
    +			}
    +
    +			stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
    +
    +			return new RocksDBKeyedStateHandle(stateBackend.jobId,
    +				stateBackend.operatorIdentifier, stateBackend.keyGroupRange,
    +				newSstFileNames, sstFiles, miscFiles, metaStateHandle);
    +		}
    +
    +		void releaseResources(boolean canceled) {
    +
    +			if (inputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +				try {
    +					inputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the input stream.", e);
    +				}
    +				inputStream = null;
    +			}
    +
    +			if (outputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +				try {
    +					outputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the output stream.", e);
    +				}
    +				outputStream = null;
    +			}
    +
    +			if (backupPath != null) {
    +				try {
    +					if (backupFileSystem.exists(backupPath)) {
    +						backupFileSystem.delete(backupPath, true);
    +					}
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly delete the checkpoint directory.", e);
    +				}
    +			}
    +
    +			if (canceled) {
    +				List<StateObject> statesToDiscard = new ArrayList<>();
    +
    +				if (metaStateHandle != null) {
    +					statesToDiscard.add(metaStateHandle);
    +				}
    +
    +				statesToDiscard.addAll(miscFiles.values());
    +
    +				for (String newSstFileName : newSstFileNames) {
    +					StreamStateHandle fileHandle = sstFiles.get(newSstFileName);
    +					if (fileHandle != null) {
    +						statesToDiscard.add(fileHandle);
    +					}
    +				}
    +
    +				try {
    +					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
    --- End diff --
    
    The same comment about cleanup also kind of applies to the local checkpointing dirs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---