You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/11/02 16:56:04 UTC

[GitHub] [accumulo] dlmarion opened a new pull request #2340: Client-side thread pools

dlmarion opened a new pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340


   Changes based on discussion [here](https://github.com/apache/accumulo/issues/2331)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741297197



##########
File path: core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
##########
@@ -338,6 +338,13 @@ ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterCon
   @Override
   void close();
 
+  /**
+   * Sets a user-defined ClientThreadPools implementation
+   *
+   * @param impl

Review comment:
       javadoc quality: remove tag if no description, or add description

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       Having one central location that entagles itself with so many disparate public API methods seems less than ideal. I think it would be better if each entry point (batch scanner, batch writer, conditional writer, etc.) added an option to their existing builders/config to set a thread pool... or more narrowly, just an uncaught exception handler for the thread pools we manage internally.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);

Review comment:
       ClientContext is not public API, and should not be

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -44,7 +46,10 @@ public void uncaughtException(Thread t, Throwable e) {
         // If e == OutOfMemoryError, then it's probably that another Error might be
         // thrown when trying to print to System.err.
       } finally {
-        Runtime.getRuntime().halt(-1);
+        Mode m = SingletonManager.getMode();
+        if (m != null && m.equals(Mode.SERVER)) {
+          Runtime.getRuntime().halt(-1);
+        }

Review comment:
       Rather than making this handler behave differently in some circumstances, why not make it so a different handler can be configured instead?

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       It's not clear. Why is this option being deprecated in the bulk import builder?

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       We did try to make those builders/config extensible, so they wouldn't have to modify anything unless they were taking advantage of the new feature... However, I think I have a better idea anyway.
   
   I noticed that this class is essentially a ThreadPoolExecutorFactory for either `ThreadPoolExecutor` or `ScheduledThreadPoolExecutor`. Instead of having so many disparate methods, we can make the interface substantially simpler, and still support all the different uses, by passing in some kind of context/scope, as in:
   
   ```java
       // this name is bad, but illustrates the idea; this could even be a String, if we don't want to constrain it
       enum TheadPoolScope {
         BATCH_WRITER, BATCH_SCANNER, CONDITIONAL_WRITER;
       }
       ThreadPoolExecutor getExecutor(ThreadPoolScope scope, ConfigSource conf);
       ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor(ThreadPoolScope scope, ConfigSource conf);
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       We did try to make those builders/config extensible, so they wouldn't have to modify anything unless they were taking advantage of the new feature... However, I think I have a better idea anyway.
   
   I noticed that this class is essentially a ThreadPoolExecutorFactory for either `ThreadPoolExecutor` or `ScheduledThreadPoolExecutor`. Instead of having so many disparate methods, we can make the interface substantially simpler, and still support all the different uses, by passing in some kind of context/scope, as in:
   
   ```java
       // this name is bad, but illustrates the idea; this could even be a String, if we don't want to constrain it
       enum TheadPoolScope {
         BATCH_WRITER, BATCH_SCANNER, CONDITIONAL_WRITER;
       }
       ThreadPoolExecutor getExecutor(ThreadPoolScope scope, ConfigSource conf);
       ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor(ThreadPoolScope scope, ConfigSource conf);
   ```
   
   The main idea here is to keep the API as free of bloat as possible, but still be extensible and applicable to all our use cases.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       Okay, a javadoc deprecated note to point to the new API would be helpful here

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -44,7 +46,10 @@ public void uncaughtException(Thread t, Throwable e) {
         // If e == OutOfMemoryError, then it's probably that another Error might be
         // thrown when trying to print to System.err.
       } finally {
-        Runtime.getRuntime().halt(-1);
+        Mode m = SingletonManager.getMode();
+        if (m != null && m.equals(Mode.SERVER)) {
+          Runtime.getRuntime().halt(-1);
+        }

Review comment:
       Never mind on this. I think the current idea of providing the executor factory is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r742003836



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I have some changes locally where I'm cleaning up potentially unclosed thread pools (for example, if someone did not call close). This means that clients might be able to specify their own implementation, but they would not be able to share the thread pool within the client or with the clients application. I'm starting to think that the only thing a user would be able to configure is the number of threads in the pools. Currently the number of threads is either hardcoded or based on a property.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959342501


   >Regarding the BulkImport code above, I believe that in this case an Error would be handled by the AccumuloUncaughtExceptionHandler as this catches Exception, not Error, and the executor that is being used to run the CompletableFuture is being created by ThreadPools.
   
   When dealing with futures in Java, Java will take your runnable, supplier,caller, etc and wrap it with one of the following before queueing it on the executor.  That code catches throwable before the executor can ever see it.
   
   
   https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java#L1683-L1708
   
   https://github.com/openjdk/jdk/blob/da75f3c4ad5bdf25167a3ed80e51f567ab3dbd01/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L263-L270
   
   
   >> If the goal of this change is to make it so that Accumulo client code never has to catch Error
   >
   > This change is to implement your suggestion in #2331 where you said:
   >
   >> I am not sure its appropriate for this particular case but one thing to consider in general from an API perspective is letting 
   >> users pass in ExecutorServices and Thread Factories.
   
   Yeah I realize that and I was uncertain about the potential solution then and I am still uncertain about it.  This PR has helped me think about it more deeply and see nuances that I was not aware of before.  Like before this PR I never realized how complex the ThreadPoolCreation process was and I thought a thread pool factory might be a simple solution to the problem.  Also while looking at this PR, I was thinking about instrumentation vs creation.
   
   Here is the instrumentation idea that I thought of and am also very uncertain about wether its a good solution. Instead of thread pool factory we could create thread pools in Accumulo client code and provide a user mechanism to instrument those thread pools like the following.
   
   ```java
   interface AccumuloClientInstrumentation {
      ThreadPoolExecutor instrument(ThreadPoolExecutor tpe);
      ScheduledThreadPoolExecutor instrument(ScheduledThreadPoolExecutor tpe);
   }
   ```
   
   ```java
   class AccumuloClient {
       /**
        * When the following is set, Accumulo will call the instrumentation object whenever it creates any object that can be instrumented by the interface.
        */
       void enableInstrumentation(AccumuloClientInstrumentation aci);
   
   }
   ```
   
   The thing I don't like about this is that it seems  like it could break accumulo, maybe that is ok though.  Also seems like it could be tricky to use correctly like the thread pool factory concept.  I would not know if its workable/good w/o trying it like this PR is trying to thread pool factories.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959379417


   Thanks for the clarification, I missed that CompletableFuture was catching Throwable. In that case, I would say that the BulkImport code is incorrect then. In the code section you [referenced](https://github.com/apache/accumulo/blob/785a3645261571f000a7adb3c7c72b07886f0587/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java#L577-L579) should it not get the cause and throw it if it's an Error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959157451






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741996230



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       The enums do not properly capture all of the complexity.  For example I see for one enum a SynchronousQueue is used and for another enum 0 to 3 threads are used.  It seems like the enum could be dropped and everything that is needed for creation pushed into the config object, maybe that becomes to complex for anyone to ever use properly though.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-958932073


   Thanks for the TableOperations reference. I added a deprecated annotation there too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-958932073






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959484271


   >  In the code section you referenced should it not get the cause and throw it if it's an Error?
   
   Would not want to get the cause and the drop the ExecutionException.  The cause only has a stack trace for the background thread.  The stack trace in the ExecutionException usually provides the path from the user code to the point where something was put on a thread pool. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741996230



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       The enums do not properly capture all of the complexity.  For example I see for one enum a SynchronousQueue is used and for another enum 0 to 3 threads are used.  It seems like the enum could be dropped and everything that is needed for creation pushed into the config object, maybe that becomes to complex for anyone to ever use properly though.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion closed pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion closed pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959209736


   > If the goal of this change is to make it so that Accumulo client code never has to catch Error
   
   This change is to implement your suggestion in #2331 where you said:
   
   > I am not sure its appropriate for this particular case but one thing to consider in general from an API perspective is letting users pass in ExecutorServices and Thread Factories.
   
   There is also a change in this PR to AccumuloUncaughtExceptionHandler where the VM will not be shut down on Error when the thread is not running in an Accumulo Server process (as determined by SingletonManager.getMode).
   
   Regarding the BulkImport code above, I believe that in this case an Error would be handled by the AccumuloUncaughtExceptionHandler as [this](https://github.com/apache/accumulo/blob/785a3645261571f000a7adb3c7c72b07886f0587/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java#L560) catches Exception, not Error, and the executor that is being used to run the CompletableFuture is being created by ThreadPools.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741828909



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public interface ClientThreadPools {
+
+  class ThreadPoolConfig {
+
+    public static final ThreadPoolConfig EMPTY_CONFIG =
+        new ThreadPoolConfig(Optional.empty(), Optional.empty(), Optional.empty());
+
+    private final Optional<Iterable<Entry<String,String>>> configuration;
+    private final Optional<Integer> numThreads;
+    private final Optional<String> threadName;
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration) {
+      this(Optional.of(configuration), Optional.empty(), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads,
+        String threadName) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.of(threadName));
+    }
+
+    private ThreadPoolConfig(Optional<Iterable<Entry<String,String>>> configuration,
+        Optional<Integer> numThreads, Optional<String> threadName) {
+      this.configuration = configuration;
+      this.numThreads = numThreads;
+      this.threadName = threadName;
+    }
+
+    public Optional<Iterable<Entry<String,String>>> getConfiguration() {
+      return configuration;
+    }
+
+    public Optional<Integer> getNumThreads() {
+      return numThreads;
+    }
+
+    public Optional<String> getThreadName() {
+      return threadName;
+    }
+  }
+
+  enum ThreadPoolUsage {
+    /**
+     * ThreadPoolExecutor that runs bulk import tasks
+     */
+    BULK_IMPORT_POOL,
+    /**
+     * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction
+     * information
+     */
+    ACTIVE_EXTERNAL_COMPACTION_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor used for adding splits to a table
+     */
+    ADD_SPLITS_THREAD_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    BATCH_SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of binning mutations
+     */
+    BATCH_WRITER_BINNING_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+     */
+    BATCH_WRITER_SEND_POOL,
+    /**
+     * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+     */
+    CONDITIONAL_WRITER_CLEANUP_TASK_POOL,
+    /**
+     * ThreadPoolExecutor responsible for loading bloom filters
+     */
+    BLOOM_FILTER_LAYER_LOADER_POOL
+  }
+
+  enum ScheduledThreadPoolUsage {
+    /**
+     * shared scheduled executor for trivial tasks
+     */
+    SHARED_GENERAL_SCHEDULED_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+     * goals.
+     */
+    BATCH_WRITER_LATENCY_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that periodically runs tasks to handle failed write mutations and
+     * send mutations to TabletServers
+     */
+    CONDITIONAL_WRITER_RETRY_POOL
+  }
+
+  /**
+   * return a ThreadPoolExecutor configured for the specified usage
+   *
+   * @param usage
+   *          thread pool usage
+   * @param config
+   *          thread pool configuration
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getThreadPool(ThreadPoolUsage usage, ThreadPoolConfig config);

Review comment:
       > Could an ExecutorService type be returned instead of ThreadPoolExecutor?
   
   There are some methods on the ScheduledThreadPoolExecutor, like remove(Runnable), that aren't  on the interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741828909



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public interface ClientThreadPools {
+
+  class ThreadPoolConfig {
+
+    public static final ThreadPoolConfig EMPTY_CONFIG =
+        new ThreadPoolConfig(Optional.empty(), Optional.empty(), Optional.empty());
+
+    private final Optional<Iterable<Entry<String,String>>> configuration;
+    private final Optional<Integer> numThreads;
+    private final Optional<String> threadName;
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration) {
+      this(Optional.of(configuration), Optional.empty(), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads,
+        String threadName) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.of(threadName));
+    }
+
+    private ThreadPoolConfig(Optional<Iterable<Entry<String,String>>> configuration,
+        Optional<Integer> numThreads, Optional<String> threadName) {
+      this.configuration = configuration;
+      this.numThreads = numThreads;
+      this.threadName = threadName;
+    }
+
+    public Optional<Iterable<Entry<String,String>>> getConfiguration() {
+      return configuration;
+    }
+
+    public Optional<Integer> getNumThreads() {
+      return numThreads;
+    }
+
+    public Optional<String> getThreadName() {
+      return threadName;
+    }
+  }
+
+  enum ThreadPoolUsage {
+    /**
+     * ThreadPoolExecutor that runs bulk import tasks
+     */
+    BULK_IMPORT_POOL,
+    /**
+     * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction
+     * information
+     */
+    ACTIVE_EXTERNAL_COMPACTION_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor used for adding splits to a table
+     */
+    ADD_SPLITS_THREAD_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    BATCH_SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of binning mutations
+     */
+    BATCH_WRITER_BINNING_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+     */
+    BATCH_WRITER_SEND_POOL,
+    /**
+     * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+     */
+    CONDITIONAL_WRITER_CLEANUP_TASK_POOL,
+    /**
+     * ThreadPoolExecutor responsible for loading bloom filters
+     */
+    BLOOM_FILTER_LAYER_LOADER_POOL
+  }
+
+  enum ScheduledThreadPoolUsage {
+    /**
+     * shared scheduled executor for trivial tasks
+     */
+    SHARED_GENERAL_SCHEDULED_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+     * goals.
+     */
+    BATCH_WRITER_LATENCY_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that periodically runs tasks to handle failed write mutations and
+     * send mutations to TabletServers
+     */
+    CONDITIONAL_WRITER_RETRY_POOL
+  }
+
+  /**
+   * return a ThreadPoolExecutor configured for the specified usage
+   *
+   * @param usage
+   *          thread pool usage
+   * @param config
+   *          thread pool configuration
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getThreadPool(ThreadPoolUsage usage, ThreadPoolConfig config);

Review comment:
       > Could an ExecutorService type be returned instead of ThreadPoolExecutor?
   
   There are some methods on the ScheduledThreadPoolExecutor, like remove(Runnable), that aren't  on the interface.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I'm not sure how we do that. We need to pass enough information to determine which pool to return and how to configure it. I'm open to suggestions.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I have some changes locally where I'm cleaning up potentially unclosed thread pools (for example, if someone did not call close). This means that clients might be able to specify their own implementation, but they would not be able to share the thread pool within the client or with the clients application. I'm starting to think that the only thing a user would be able to configure is the number of threads in the pools. Currently the number of threads is either hardcoded or based on a property.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959817323


   What are your thoughts about using [CompletableFuture.completeExceptionally​(Throwable ex)](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeExceptionally(java.lang.Throwable))  instead of doing:
   
   ```
      } catch (Exception e) { 
        throw new CompletionException(e); 
      } 
   ```
   
   Edit: Nevermind - looks like they both end up doing the same thing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-961035548


   Closing, will resubmit a new PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741323905



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);

Review comment:
       Fixed in 94a6163, I should have waited to have you review until the checks were done.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
##########
@@ -338,6 +338,13 @@ ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterCon
   @Override
   void close();
 
+  /**
+   * Sets a user-defined ClientThreadPools implementation
+   *
+   * @param impl

Review comment:
       Fixed in 94a6163, I should have waited to have you review until the checks were done.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I was trying to reduce the number of places users would have to modify their pre-2.1.0 code. This way would allow them to create their own implementation when they create the client instead of specifying for each scanner/writer/etc. Additionally, by creating their own ThreadPool's, they can set their own uncaught exception handler.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       It's available in the ClienttThreadPools implementation.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       It's available in the ClientThreadPools implementation.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       see 2effcce.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       Resolved in 6c996df




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741474971



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public interface ClientThreadPools {
+
+  class ThreadPoolConfig {
+
+    public static final ThreadPoolConfig EMPTY_CONFIG =
+        new ThreadPoolConfig(Optional.empty(), Optional.empty(), Optional.empty());
+
+    private final Optional<Iterable<Entry<String,String>>> configuration;
+    private final Optional<Integer> numThreads;
+    private final Optional<String> threadName;
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration) {
+      this(Optional.of(configuration), Optional.empty(), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads,
+        String threadName) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.of(threadName));
+    }
+
+    private ThreadPoolConfig(Optional<Iterable<Entry<String,String>>> configuration,
+        Optional<Integer> numThreads, Optional<String> threadName) {
+      this.configuration = configuration;
+      this.numThreads = numThreads;
+      this.threadName = threadName;
+    }
+
+    public Optional<Iterable<Entry<String,String>>> getConfiguration() {
+      return configuration;
+    }
+
+    public Optional<Integer> getNumThreads() {
+      return numThreads;
+    }
+
+    public Optional<String> getThreadName() {
+      return threadName;
+    }
+  }
+
+  enum ThreadPoolUsage {
+    /**
+     * ThreadPoolExecutor that runs bulk import tasks
+     */
+    BULK_IMPORT_POOL,
+    /**
+     * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction
+     * information
+     */
+    ACTIVE_EXTERNAL_COMPACTION_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor used for adding splits to a table
+     */
+    ADD_SPLITS_THREAD_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    BATCH_SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of binning mutations
+     */
+    BATCH_WRITER_BINNING_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+     */
+    BATCH_WRITER_SEND_POOL,
+    /**
+     * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+     */
+    CONDITIONAL_WRITER_CLEANUP_TASK_POOL,
+    /**
+     * ThreadPoolExecutor responsible for loading bloom filters
+     */
+    BLOOM_FILTER_LAYER_LOADER_POOL
+  }
+
+  enum ScheduledThreadPoolUsage {
+    /**
+     * shared scheduled executor for trivial tasks
+     */
+    SHARED_GENERAL_SCHEDULED_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+     * goals.
+     */
+    BATCH_WRITER_LATENCY_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that periodically runs tasks to handle failed write mutations and
+     * send mutations to TabletServers
+     */
+    CONDITIONAL_WRITER_RETRY_POOL
+  }
+
+  /**
+   * return a ThreadPoolExecutor configured for the specified usage
+   *
+   * @param usage
+   *          thread pool usage
+   * @param config
+   *          thread pool configuration
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getThreadPool(ThreadPoolUsage usage, ThreadPoolConfig config);

Review comment:
       Could an ExecutorService type be returned instead of ThreadPoolExecutor?  The javadocs need to clearly state that Accumulo will shut these thread pools down as needed (if that is what will happen).  Also seem like the name `newExecutorService` or `createExecutorService` would be better than get.
   
   ```suggestion
     ExecutorService createExecutorService(ThreadPoolUsage usage, ThreadPoolConfig config);
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       > The main idea here is to keep the API as free of bloat as possible
   
   It would be nice to trim this down. Not sure how, but all of the enums do seem like a bit much.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741828909



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public interface ClientThreadPools {
+
+  class ThreadPoolConfig {
+
+    public static final ThreadPoolConfig EMPTY_CONFIG =
+        new ThreadPoolConfig(Optional.empty(), Optional.empty(), Optional.empty());
+
+    private final Optional<Iterable<Entry<String,String>>> configuration;
+    private final Optional<Integer> numThreads;
+    private final Optional<String> threadName;
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration) {
+      this(Optional.of(configuration), Optional.empty(), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads,
+        String threadName) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.of(threadName));
+    }
+
+    private ThreadPoolConfig(Optional<Iterable<Entry<String,String>>> configuration,
+        Optional<Integer> numThreads, Optional<String> threadName) {
+      this.configuration = configuration;
+      this.numThreads = numThreads;
+      this.threadName = threadName;
+    }
+
+    public Optional<Iterable<Entry<String,String>>> getConfiguration() {
+      return configuration;
+    }
+
+    public Optional<Integer> getNumThreads() {
+      return numThreads;
+    }
+
+    public Optional<String> getThreadName() {
+      return threadName;
+    }
+  }
+
+  enum ThreadPoolUsage {
+    /**
+     * ThreadPoolExecutor that runs bulk import tasks
+     */
+    BULK_IMPORT_POOL,
+    /**
+     * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction
+     * information
+     */
+    ACTIVE_EXTERNAL_COMPACTION_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor used for adding splits to a table
+     */
+    ADD_SPLITS_THREAD_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    BATCH_SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of binning mutations
+     */
+    BATCH_WRITER_BINNING_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+     */
+    BATCH_WRITER_SEND_POOL,
+    /**
+     * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+     */
+    CONDITIONAL_WRITER_CLEANUP_TASK_POOL,
+    /**
+     * ThreadPoolExecutor responsible for loading bloom filters
+     */
+    BLOOM_FILTER_LAYER_LOADER_POOL
+  }
+
+  enum ScheduledThreadPoolUsage {
+    /**
+     * shared scheduled executor for trivial tasks
+     */
+    SHARED_GENERAL_SCHEDULED_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+     * goals.
+     */
+    BATCH_WRITER_LATENCY_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that periodically runs tasks to handle failed write mutations and
+     * send mutations to TabletServers
+     */
+    CONDITIONAL_WRITER_RETRY_POOL
+  }
+
+  /**
+   * return a ThreadPoolExecutor configured for the specified usage
+   *
+   * @param usage
+   *          thread pool usage
+   * @param config
+   *          thread pool configuration
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getThreadPool(ThreadPoolUsage usage, ThreadPoolConfig config);

Review comment:
       > Could an ExecutorService type be returned instead of ThreadPoolExecutor?
   
   There are some methods on the ScheduledThreadPoolExecutor, like remove(Runnable), that aren't  on the interface.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I'm not sure how we do that. We need to pass enough information to determine which pool to return and how to configure it. I'm open to suggestions.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I have some changes locally where I'm cleaning up potentially unclosed thread pools (for example, if someone did not call close). This means that clients might be able to specify their own implementation, but they would not be able to share the thread pool within the client or with the clients application. I'm starting to think that the only thing a user would be able to configure is the number of threads in the pools. Currently the number of threads is either hardcoded or based on a property.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public interface ClientThreadPools {
+
+  class ThreadPoolConfig {
+
+    public static final ThreadPoolConfig EMPTY_CONFIG =
+        new ThreadPoolConfig(Optional.empty(), Optional.empty(), Optional.empty());
+
+    private final Optional<Iterable<Entry<String,String>>> configuration;
+    private final Optional<Integer> numThreads;
+    private final Optional<String> threadName;
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration) {
+      this(Optional.of(configuration), Optional.empty(), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int numThreads,
+        String threadName) {
+      this(Optional.of(configuration), Optional.of(numThreads), Optional.of(threadName));
+    }
+
+    private ThreadPoolConfig(Optional<Iterable<Entry<String,String>>> configuration,
+        Optional<Integer> numThreads, Optional<String> threadName) {
+      this.configuration = configuration;
+      this.numThreads = numThreads;
+      this.threadName = threadName;
+    }
+
+    public Optional<Iterable<Entry<String,String>>> getConfiguration() {
+      return configuration;
+    }
+
+    public Optional<Integer> getNumThreads() {
+      return numThreads;
+    }
+
+    public Optional<String> getThreadName() {
+      return threadName;
+    }
+  }
+
+  enum ThreadPoolUsage {
+    /**
+     * ThreadPoolExecutor that runs bulk import tasks
+     */
+    BULK_IMPORT_POOL,
+    /**
+     * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction
+     * information
+     */
+    ACTIVE_EXTERNAL_COMPACTION_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor used for adding splits to a table
+     */
+    ADD_SPLITS_THREAD_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    BATCH_SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of binning mutations
+     */
+    BATCH_WRITER_BINNING_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+     */
+    BATCH_WRITER_SEND_POOL,
+    /**
+     * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+     */
+    CONDITIONAL_WRITER_CLEANUP_TASK_POOL,
+    /**
+     * ThreadPoolExecutor responsible for loading bloom filters
+     */
+    BLOOM_FILTER_LAYER_LOADER_POOL
+  }
+
+  enum ScheduledThreadPoolUsage {
+    /**
+     * shared scheduled executor for trivial tasks
+     */
+    SHARED_GENERAL_SCHEDULED_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+     * goals.
+     */
+    BATCH_WRITER_LATENCY_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that periodically runs tasks to handle failed write mutations and
+     * send mutations to TabletServers
+     */
+    CONDITIONAL_WRITER_RETRY_POOL
+  }
+
+  /**
+   * return a ThreadPoolExecutor configured for the specified usage
+   *
+   * @param usage
+   *          thread pool usage
+   * @param config
+   *          thread pool configuration
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getThreadPool(ThreadPoolUsage usage, ThreadPoolConfig config);

Review comment:
       > Could an ExecutorService type be returned instead of ThreadPoolExecutor?
   
   There are some methods on the ScheduledThreadPoolExecutor, like remove(Runnable), that aren't  on the interface.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I'm not sure how we do that. We need to pass enough information to determine which pool to return and how to configure it. I'm open to suggestions.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I have some changes locally where I'm cleaning up potentially unclosed thread pools (for example, if someone did not call close). This means that clients might be able to specify their own implementation, but they would not be able to share the thread pool within the client or with the clients application. I'm starting to think that the only thing a user would be able to configure is the number of threads in the pools. Currently the number of threads is either hardcoded or based on a property.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959379417






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741430001



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       Resolved in 6c996df




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-958119343


   While looking at this I noticed that 'CleanerUtil.shutdownThreadPoolExecutor` is called from only one location in the client code (TabletServerBatchReader). Should we fix that also?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959817323


   What are your thoughts about using [CompletableFuture.completeExceptionally​(Throwable ex)](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeExceptionally(java.lang.Throwable))  instead of doing:
   
   ```
      } catch (Exception e) { 
        throw new CompletionException(e); 
      } 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-958183492


   For reference this is maybe the only place in the API where a user can currently pass in an Executor
   
   https://github.com/apache/accumulo/blob/dc847b1fb971efa81e6fcc24f126e93fcc1ddf8a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java#L776-L785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959157451






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959157451


   If the goal of this change is to make it so that Accumulo client code never has to catch Error, then this change will not completely achieve that goal.  Anytime we use Future or CompletableFuture w/ a thread pool we will still be catching Throwable indirectly via java Future code.  The java Future code wraps task submitted to the thread pool with code that catches Throwable, so the thread pool itself will never see errors.
   
   For example the following code calls `supplyAsync()` which queues a CompletableFuture task on `executor`. The queued task will catch throwable and the executor would never see any errors.
   
   https://github.com/apache/accumulo/blob/785a3645261571f000a7adb3c7c72b07886f0587/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java#L544-L563
   
   If an error did occur the following code would see it wrapped in an ExecutionException and then wrap that in a RuntimeException and punt it back to the user.  So we would still be catching Errors in the Accumulo client code, unless we stop using Java futures.
   
   https://github.com/apache/accumulo/blob/785a3645261571f000a7adb3c7c72b07886f0587/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java#L577-L579
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-960007378


   @ctubbsii @keith-turner  - I'm thinking about getting rid of the client supplied thread pools idea. Without that, this PR contains:
    1. Modifying the AccumuloUncaughtExceptionHandler to not halt the VM when not running in an Accumulo server process
    2. Use of CleanerUtil on all client side thread pools
    3. Propagation of Error in background tasks/threads to the client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741835959



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I'm not sure how we do that. We need to pass enough information to determine which pool to return and how to configure it. I'm open to suggestions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959379417






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-958932073






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741297197



##########
File path: core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
##########
@@ -338,6 +338,13 @@ ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterCon
   @Override
   void close();
 
+  /**
+   * Sets a user-defined ClientThreadPools implementation
+   *
+   * @param impl

Review comment:
       javadoc quality: remove tag if no description, or add description

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       Having one central location that entagles itself with so many disparate public API methods seems less than ideal. I think it would be better if each entry point (batch scanner, batch writer, conditional writer, etc.) added an option to their existing builders/config to set a thread pool... or more narrowly, just an uncaught exception handler for the thread pools we manage internally.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);

Review comment:
       ClientContext is not public API, and should not be

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -44,7 +46,10 @@ public void uncaughtException(Thread t, Throwable e) {
         // If e == OutOfMemoryError, then it's probably that another Error might be
         // thrown when trying to print to System.err.
       } finally {
-        Runtime.getRuntime().halt(-1);
+        Mode m = SingletonManager.getMode();
+        if (m != null && m.equals(Mode.SERVER)) {
+          Runtime.getRuntime().halt(-1);
+        }

Review comment:
       Rather than making this handler behave differently in some circumstances, why not make it so a different handler can be configured instead?

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       It's not clear. Why is this option being deprecated in the bulk import builder?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741297197



##########
File path: core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
##########
@@ -338,6 +338,13 @@ ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterCon
   @Override
   void close();
 
+  /**
+   * Sets a user-defined ClientThreadPools implementation
+   *
+   * @param impl

Review comment:
       javadoc quality: remove tag if no description, or add description

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       Having one central location that entagles itself with so many disparate public API methods seems less than ideal. I think it would be better if each entry point (batch scanner, batch writer, conditional writer, etc.) added an option to their existing builders/config to set a thread pool... or more narrowly, just an uncaught exception handler for the thread pools we manage internally.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);

Review comment:
       ClientContext is not public API, and should not be

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -44,7 +46,10 @@ public void uncaughtException(Thread t, Throwable e) {
         // If e == OutOfMemoryError, then it's probably that another Error might be
         // thrown when trying to print to System.err.
       } finally {
-        Runtime.getRuntime().halt(-1);
+        Mode m = SingletonManager.getMode();
+        if (m != null && m.equals(Mode.SERVER)) {
+          Runtime.getRuntime().halt(-1);
+        }

Review comment:
       Rather than making this handler behave differently in some circumstances, why not make it so a different handler can be configured instead?

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       It's not clear. Why is this option being deprecated in the bulk import builder?

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       We did try to make those builders/config extensible, so they wouldn't have to modify anything unless they were taking advantage of the new feature... However, I think I have a better idea anyway.
   
   I noticed that this class is essentially a ThreadPoolExecutorFactory for either `ThreadPoolExecutor` or `ScheduledThreadPoolExecutor`. Instead of having so many disparate methods, we can make the interface substantially simpler, and still support all the different uses, by passing in some kind of context/scope, as in:
   
   ```java
       // this name is bad, but illustrates the idea; this could even be a String, if we don't want to constrain it
       enum TheadPoolScope {
         BATCH_WRITER, BATCH_SCANNER, CONDITIONAL_WRITER;
       }
       ThreadPoolExecutor getExecutor(ThreadPoolScope scope, ConfigSource conf);
       ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor(ThreadPoolScope scope, ConfigSource conf);
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       We did try to make those builders/config extensible, so they wouldn't have to modify anything unless they were taking advantage of the new feature... However, I think I have a better idea anyway.
   
   I noticed that this class is essentially a ThreadPoolExecutorFactory for either `ThreadPoolExecutor` or `ScheduledThreadPoolExecutor`. Instead of having so many disparate methods, we can make the interface substantially simpler, and still support all the different uses, by passing in some kind of context/scope, as in:
   
   ```java
       // this name is bad, but illustrates the idea; this could even be a String, if we don't want to constrain it
       enum TheadPoolScope {
         BATCH_WRITER, BATCH_SCANNER, CONDITIONAL_WRITER;
       }
       ThreadPoolExecutor getExecutor(ThreadPoolScope scope, ConfigSource conf);
       ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor(ThreadPoolScope scope, ConfigSource conf);
   ```
   
   The main idea here is to keep the API as free of bloat as possible, but still be extensible and applicable to all our use cases.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       Okay, a javadoc deprecated note to point to the new API would be helpful here

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -44,7 +46,10 @@ public void uncaughtException(Thread t, Throwable e) {
         // If e == OutOfMemoryError, then it's probably that another Error might be
         // thrown when trying to print to System.err.
       } finally {
-        Runtime.getRuntime().halt(-1);
+        Mode m = SingletonManager.getMode();
+        if (m != null && m.equals(Mode.SERVER)) {
+          Runtime.getRuntime().halt(-1);
+        }

Review comment:
       Never mind on this. I think the current idea of providing the executor factory is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-959379417


   Thanks for the clarification, I missed that CompletableFuture was catching Throwable. In that case, I would say that the BulkImport code is incorrect then. In the code section you [referenced](https://github.com/apache/accumulo/blob/785a3645261571f000a7adb3c7c72b07886f0587/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java#L577-L579) should it not get the cause and throw it if it's an Error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741323905



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);

Review comment:
       Fixed in 94a6163, I should have waited to have you review until the checks were done.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
##########
@@ -338,6 +338,13 @@ ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterCon
   @Override
   void close();
 
+  /**
+   * Sets a user-defined ClientThreadPools implementation
+   *
+   * @param impl

Review comment:
       Fixed in 94a6163, I should have waited to have you review until the checks were done.

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       I was trying to reduce the number of places users would have to modify their pre-2.1.0 code. This way would allow them to create their own implementation when they create the client instead of specifying for each scanner/writer/etc. Additionally, by creating their own ThreadPool's, they can set their own uncaught exception handler.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       It's available in the ClienttThreadPools implementation.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
##########
@@ -229,6 +228,7 @@ private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloEx
   }
 
   @Override
+  @Deprecated(since = "2.1.0")
   public ImportMappingOptions executor(Executor service) {

Review comment:
       It's available in the ClientThreadPools implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#issuecomment-958119343


   While looking at this I noticed that 'CleanerUtil.shutdownThreadPoolExecutor` is called from only one location in the client code (TabletServerBatchReader). Should we fix that also?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741415897



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       see 2effcce.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2340: Client-side thread pools

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2340:
URL: https://github.com/apache/accumulo/pull/2340#discussion_r741996230



##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       The enums do not properly capture all of the complexity.  For example I see for one enum a SynchronousQueue is used and for another enum 0 to 3 threads are used.  It seems like the enum could be dropped and everything that is needed for creation pushed into the config object, maybe that becomes to complex for anyone to ever use properly though.
   
   

##########
File path: core/src/main/java/org/apache/accumulo/core/client/ClientThreadPools.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.clientImpl.ClientContext;
+
+public interface ClientThreadPools {
+
+  /**
+   * return a shared scheduled executor for trivial tasks
+   *
+   * @param ctx
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getSharedScheduledExecutor(ClientContext context);
+
+  /**
+   * ThreadPoolExecutor that runs bulk import tasks
+   *
+   * @param ctx
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBulkImportThreadPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor that runs tasks to contact Compactors to get running compaction information
+   *
+   * @param numThreads
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getExternalCompactionActiveCompactionsPool(ClientContext ctx, int numThreads);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getScannerReadAheadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for adding splits to a table
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getAddSplitsThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor used for fetching data from the TabletServers
+   *
+   * @param ctx
+   * @param numQueryThreads
+   * @param batchReaderInstance
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchReaderThreadPool(ClientContext ctx, int numQueryThreads,
+      int batchReaderInstance);
+
+  /**
+   * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet the users latency
+   * goals.
+   * 
+   * @param ctx
+   *          client context object
+   * @return ScheduledThreadPoolExecutor
+   */
+  ScheduledThreadPoolExecutor getBatchWriterLatencyTasksThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of binning mutations
+   * 
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterBinningThreadPool(ClientContext ctx);
+
+  /**
+   * ThreadPoolExecutor that runs the tasks of sending mutations to TabletServers
+   *
+   * @param ctx
+   *          client context object
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getBatchWriterSendThreadPool(ClientContext ctx, int numSendThreads);
+
+  /**
+   * ThreadPoolExecutor that runs clean up tasks when close is called on the ConditionalWriter
+   *
+   * @param ctx
+   * @return ThreadPoolExecutor
+   */
+  ThreadPoolExecutor getConditionalWriterCleanupTaskThreadPool(ClientContext ctx);

Review comment:
       The enums do not properly capture all of the complexity.  For example I see for one enum a SynchronousQueue is used and for another enum 0 to 3 threads are used.  It seems like the enum could be dropped and everything that is needed for creation pushed into the config object, maybe that becomes to complex for anyone to ever use properly though.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org