You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/16 12:49:59 UTC

[GitHub] StefanRRichter commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

StefanRRichter commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r248267921
 
 

 ##########
 File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##########
 @@ -61,6 +80,116 @@ static void transferAllStateDataToDirectory(
 		downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry);
 	}
 
+	/**
+	 * Upload all the files to checkpoint fileSystem using specified number of threads.
+	 *
+	 * @param files The files will be uploaded to checkpoint filesystem.
+	 * @param numberOfSnapshottingThreads The number of threads used to upload the files.
+	 * @param checkpointStreamFactory The checkpoint streamFactory used to create outputstream.
+	 * @param closeableRegistry
+	 *
+	 * @throws Exception Thrown if can not upload all the files.
+	 */
+	public static Map<StateHandleID, StreamStateHandle> uploadFilesToCheckpointFs(
+		@Nonnull Map<StateHandleID, Path> files,
+		int numberOfSnapshottingThreads,
+		CheckpointStreamFactory checkpointStreamFactory,
+		CloseableRegistry closeableRegistry) throws Exception {
+
+		Map<StateHandleID, StreamStateHandle> handles = new HashMap<>();
+
+		ExecutorService executorService = createExecutorService(numberOfSnapshottingThreads);
 
 Review comment:
   This line makes me wonder if it is a good idea that this is just a static utils class instead of a proper class. One advantage of executor services is that they don't need to recreate threads all the time. I agree that this method is not called in any hot loops, so the overhead factor might be not dramatic, but still I wonder why not to make this a proper class that creates a threadpool once and only shuts it down at the end of the object lifecycle. Any reason why you chose this way?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services