You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/21 06:49:54 UTC

[GitHub] [iceberg] yittg opened a new pull request #4177: Use particuar worker pool for flink jobs

yittg opened a new pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177


   Fixes #3776 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824173208



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       I don't think that we need to keep worker pools here in a static map.
   
   The two places where this is called immediately set up a callback that calls shutdown, but could easily keep a reference to the worker pool locally instead of storing it here by name.
   
   I think it would be better to avoid keeping track of pools here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824867925



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       @stevenzwu, for that use case, maybe we should follow up to this PR with one that allows you to configure a named threadpool? I think that's probably the use case that @yittg had in mind when he set up sharing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824320365



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       @rdblue ,sorry, i don't get your point exactly. let me guess, what you really mean is sharing pools for all sources or all sinks, not for all sources and sinks?
   To be clear, for example, if a job consists of:
   Source: Iceberg A(parallelism: 3), Source:Iceberg B, Sink:Iceberg C, Sink: Iceberg D.
   What's your favor?
   
   1. share btw all parallelism of one operator, like 3 subtask for Iceberg A (it can be run in different slots in one TaskManager or different TaskManagers) ;
   2. share btw all sources or all sinks, like sharing one for btw A and B, and another one for C and D; 
   3. share btw all operators, like sharing btw A, B, C, and D; all subtasks in same TaskManager can share.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#issuecomment-1047433769


   @rdblue @openinx Please help review this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r812198410



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -350,11 +354,19 @@ public void endInput() throws IOException {
     return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
   }
 
+  @Override
+  public void open() throws Exception {
+    this.workerPool = ThreadPools.newWorkerPool("iceberg-files-committer-worker-pool-%d");
+  }
+
   @Override
   public void close() throws Exception {
     if (tableLoader != null) {
       tableLoader.close();
     }
+    if (workerPool != null) {

Review comment:
       Nit: we add a newline after control flow blocks and the following statement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r812199983



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,11 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {

Review comment:
       Also, is there a way to share pools if there are multiple Iceberg operators in the same Flink job?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824348863



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       So no any kind of sharing is needed mentioned [here#comment](https://github.com/apache/iceberg/pull/4177#discussion_r812199983), right? That makes sense to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r825237392



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       Yeah, although I didn't think about sharing at the beginning. but some kind of sharing or global limit sounds good to me after some consideration. We can provide a reasonable solution next I think. 
   Thanks, @rdblue and @stevenzwu .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r812574121



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,11 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {

Review comment:
       > Should the pool size be configured by parameters?
   
   Configured from scan context;
   
   > Also, is there a way to share pools if there are multiple Iceberg operators in the same Flink job?
   
   I think it's hard to share, and it will easily get meaningless across distributed nodes.
   What do you think, @rdblue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r812575439



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -129,6 +135,10 @@ public void initializeState(StateInitializationContext context) throws Exception
     Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
         MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
 
+    int manifestsScanParallelism = PropertyUtil.propertyAsInt(
+        table.properties(), MANIFESTS_SCAN_PARALLELISM, ThreadPools.WORKER_THREAD_POOL_SIZE);
+    this.workerPool = ThreadPools.newWorkerPool("iceberg-files-committer-worker-pool", manifestsScanParallelism);

Review comment:
       configured from `table#properties` like the `MAX_CONTINUOUS_EMPTY_COMMITS` configuration  to make it simple for now. Add another ways if it's needed in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824154868



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,16 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    final String jobId = getRuntimeContext().getJobId().toString();
+    this.workerPool = ThreadPools.newKeyedWorkerPool(jobId, "flink-worker-pool", scanContext.planParallelism());

Review comment:
       I saw this shares the same key as `IcebergFilesCommitter`, but not `FlinkInputFormat`. Trying to understand the reasons.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824174612



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),

Review comment:
       I'm okay either way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824874718



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       In addition to documentation change, we should also make sure this behavior change is captured in the release note of the next minor version release of 0.14.0.  @rdblue where do we track future release note?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824150239



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),

Review comment:
       should we make the param a primitive type and provide an overload method without the poolSize param?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824496501



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -76,9 +78,12 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
   public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
     // Called in Job manager, so it is OK to load table from catalog.
     tableLoader.open();
+    final ExecutorService workerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism());
     try (TableLoader loader = tableLoader) {
       Table table = loader.loadTable();
-      return FlinkSplitPlanner.planInputSplits(table, context);
+      return FlinkSplitPlanner.planInputSplits(table, context, workerPool);
+    } finally {
+      workerPool.shutdown();

Review comment:
       This function is called in client and job manager. So there is no context here. Given it's a adhoc pool and will be shut down after planning, i think it's ok to name it in this way.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824870589



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,11 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {

Review comment:
       Hey, sorry about this. I think my comment here is probably what caused the confusion about sharing pools by job ID. I think there are use cases around this (Steven has one at least) but let's focus on fixing the problem here and sharing resources later.
   
   Thanks for your patience, @yittg!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824206152



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       if the intention is to reuse the job specific thread pool in Flink, then we do need the static cache as the same keyed pool may be requested from multiple code path. 
   
   Is this a Flink only problem regarding classloader issue on thread pool? if so, maybe we can move the keyed cache into Flink module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824859312



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       after reviewing the usage of the thread pools, I am also in favor of no sharing of thread pools so that we can avoid the static cache. None of the usage is on parallel tasks.
   * source: split planning (running on jobmanager or the single-parallelism StreamingMonitorFunction)
   * sink: single-parallelism committer
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824171849



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,16 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    final String jobId = getRuntimeContext().getJobId().toString();
+    this.workerPool = ThreadPools.newKeyedWorkerPool(jobId, "flink-worker-pool", scanContext.planParallelism());

Review comment:
       I agree here. Since this is creating a different thread pool per job ID, the thread name prefix should also include the job ID to get unique names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824345200



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       What I was thinking was a pool per operator in a job, rather than a pool per job. That avoids the need to track thread pools by some key in static state. I think it is probably fine to have more pools since these are primarily for IO. Does that sound reasonable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r812197949



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +55,16 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String nameFormat) {

Review comment:
       In other places, we use a name prefix and add `"-%d"` to that prefix so that we can guarantee it is there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r812200354



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +55,16 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String nameFormat) {

Review comment:
       And I think this should also accept a parallelism argument so that it is easily configurable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r815958309



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,11 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {

Review comment:
       @rdblue do you think that sharing a pool in one job is a blocking issue? If that, we can provide a pool keyed by job. it's somehow reasonable to replace the original pool equivalently  :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824859312



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       after reviewing the usage of the thread pools, I am also in favor of no sharing of thread pools so that we can avoid the static cache. None of the usage is on parallel tasks.
   * source: split planning (running on jobmanager or the single-parallelism StreamingMonitorFunction)
   * sink: single-parallelism committer
   
   But we do need to add some user doc to clarify the behavior change regarding I/O thread pool. Previously, there is a global shared thread pool per JVM. Now it is per source/sink. E.g., Internally we had a rather unique setup where a single Flink job (running on many taskmanagers) can ingest data to dozens or hundreds of Iceberg tables. For those setups, users would need to tune down the pool size to probably 1 to avoid excessive number of threads created in JVM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824287567



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       Oh, so the job can share between the monitor and the sink? I don't really mind having two pools for that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824900467



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       @stevenzwu, I added the "release notes" tag to this PR and added it to the 0.14.0 release milestone so we add this to release notes. If you want, you can add a comment with the suggested release notes at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r812198820



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,11 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {

Review comment:
       Should the pool size be configured by parameters?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824348863



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       So **NO** any kind of sharing is needed mentioned [here#comment](https://github.com/apache/iceberg/pull/4177#discussion_r812199983), right? 
   That makes sense to me. What's your opinion, @stevenzwu ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824155355



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
##########
@@ -72,7 +73,8 @@ private SplitHelpers() {
       }
 
       final ScanContext scanContext = ScanContext.builder().build();
-      final List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext);
+      final List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(
+          table, scanContext, ThreadPools.getWorkerPool());

Review comment:
       this uses the default/shared worker pool, while others use a separate pool. what are the reasons?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824155996



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
##########
@@ -72,7 +73,8 @@ private SplitHelpers() {
       }
 
       final ScanContext scanContext = ScanContext.builder().build();
-      final List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext);
+      final List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(
+          table, scanContext, ThreadPools.getWorkerPool());

Review comment:
       nm. I guess it is because this is a test class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824496501



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -76,9 +78,12 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
   public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
     // Called in Job manager, so it is OK to load table from catalog.
     tableLoader.open();
+    final ExecutorService workerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism());
     try (TableLoader loader = tableLoader) {
       Table table = loader.loadTable();
-      return FlinkSplitPlanner.planInputSplits(table, context);
+      return FlinkSplitPlanner.planInputSplits(table, context, workerPool);
+    } finally {
+      workerPool.shutdown();

Review comment:
       This function is called in client and job manager. So there is no context here. Given it's a adhoc pool and will be shut down after planning, i think it's ok to naming it in this way.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824149811



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {

Review comment:
       nit: is `poolSize` more intuitive than `parallelism`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824172039



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
##########
@@ -79,6 +83,16 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
     this.scanContext = scanContext;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    final String jobId = getRuntimeContext().getJobId().toString();
+    this.workerPool = ThreadPools.newKeyedWorkerPool(jobId, "flink-worker-pool", scanContext.planParallelism());
+    getRuntimeContext().registerUserCodeClassLoaderReleaseHookIfAbsent(
+        "release-flink-worker-pool", () -> ThreadPools.shutdownKeyedWorkerPool(jobId));

Review comment:
       Is the key here also going to be a problem? Or is this a description?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #4177: Use particular worker pool for flink jobs

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #4177:
URL: https://github.com/apache/iceberg/pull/4177#discussion_r824348863



##########
File path: core/src/main/java/org/apache/iceberg/util/ThreadPools.java
##########
@@ -61,6 +62,26 @@ public static ExecutorService getWorkerPool() {
     return WORKER_POOL;
   }
 
+  public static ExecutorService newWorkerPool(String namePrefix, Integer parallelism) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            Optional.ofNullable(parallelism).orElse(WORKER_THREAD_POOL_SIZE),
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(namePrefix + "-%d")
+                .build()));
+  }
+
+  public static ExecutorService newKeyedWorkerPool(String key, String namePrefix, Integer parallelism) {

Review comment:
       So no any kind of sharing is needed mentioned [here#comment](https://github.com/apache/iceberg/pull/4177#discussion_r812199983), right? 
   That makes sense to me. What's your opinion, @stevenzwu ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org