You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/04/12 15:37:12 UTC

[GitHub] [accumulo] ctubbsii commented on a diff in pull request #2609: Implement a write thread limit

ctubbsii commented on code in PR #2609:
URL: https://github.com/apache/accumulo/pull/2609#discussion_r848568550


##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -686,15 +686,29 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
 
   @Override
   public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
-      List<TMutation> tmutations) {
+      List<TMutation> tmutations) throws TException {
     UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID);
     if (us == null) {
       return;
     }
 
+    Optional<Semaphore> writeThreadSemaphore = Optional.empty();
     boolean reserved = true;
+
     try {
       KeyExtent keyExtent = KeyExtent.fromThrift(tkeyExtent);
+
+      if (TabletType.type(keyExtent) == TabletType.USER) {
+        writeThreadSemaphore = server.getSemaphore();
+        // if write thread max is configured, get the Semaphore, otherwise do nothing
+        if (writeThreadSemaphore.isPresent()) {
+          Semaphore sem = writeThreadSemaphore.get();
+          if (sem.tryAcquire()) {
+            log.trace("Available permits: {}", sem.availablePermits());
+          } else
+            throw new TException("Mutation failed. No threads available.");

Review Comment:
   Do we really want to fail if no threads are available? Wouldn't we just want to block? If this fails, what happens to the client? Does this exception reach the client? Could that cause a lot of unnecessary RPC IO while the client retries?



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +402,23 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Optional<Semaphore> getSemaphore() {

Review Comment:
   This method could be named better to indicate what the Semaphore is for. It could also have a javadoc that would explain when the Optional might be empty.



##########
test/src/main/java/org/apache/accumulo/test/functional/WriteThreadsIT.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.TSERV_WRITE_THREADS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WriteThreadsIT extends AccumuloClusterHarness {
+
+  ThreadPoolExecutor tpe;
+  private static final Logger log = LoggerFactory.getLogger(WriteThreadsIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // sets the thread limit on the SERVER SIDE
+    // default value is 0. when set to 0, there is no limit
+    cfg.setProperty(TSERV_WRITE_THREADS_MAX.getKey(), "10");
+  }
+
+  @Test
+  public void test() throws Exception {
+    write();
+  }
+
+  public void write() throws Exception {
+    // each thread creates a batch writer, adds mutations, and then flushes.
+    final int threadCount = 100;
+
+    // Reads and writes from Accumulo
+    BatchWriterConfig config = new BatchWriterConfig();
+    config.setMaxWriteThreads(1_000); // this is the max write threads on the CLIENT SIDE
+
+    try (AccumuloClient client =
+        Accumulo.newClient().from(getClientProps()).batchWriterConfig(config).build()) {
+
+      tpe = new ThreadPoolExecutor(threadCount, threadCount, 0, TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(threadCount));
+
+      final String[] tables = getUniqueNames(threadCount);
+
+      for (int i = 0; i < threadCount; i++) {
+        if (i % 10 == 0)
+          log.debug("iteration: {}", i);
+
+        final String tableName = tables[i];
+        client.tableOperations().create(tableName);
+
+        Runnable r = () -> {
+          try (BatchWriter writer = client.createBatchWriter(tableName)) {
+
+            // Data is written to mutation objects
+            List<Mutation> mutations = Stream.of("row1", "row2", "row3", "row4", "row5")
+                .map(Mutation::new).collect(Collectors.toList());
+
+            for (Mutation mutation : mutations) {
+              mutation.at().family("myColFam").qualifier("myColQual").put("myValue1");
+              mutation.at().family("myColFam").qualifier("myColQual").put("myValue2");
+              writer.addMutation(mutation);
+            }
+
+            // Sends buffered mutation to Accumulo immediately (write executed)
+            writer.flush();
+          } catch (Exception e) {
+            log.error("ERROR WRITING MUTATION", e);
+          }
+        };
+        // Runnable above is executed
+        tpe.execute(r);
+      }
+
+      tpe.shutdown();
+      assertTrue(tpe.awaitTermination(90, TimeUnit.SECONDS));
+
+      validateData(client, threadCount, tables);
+    }
+  }
+
+  private void validateData(AccumuloClient client, int threadCount, String[] tables)
+      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    for (int i = 0; i < threadCount; i++) {
+      try (var batchScanner = client.createBatchScanner(tables[i])) {
+        batchScanner.setRanges(List.of(new Range()));
+
+        List<String> rows = new ArrayList<>();
+        for (var e : batchScanner) {
+          rows.add(e.getKey().getRow().toString());
+        }

Review Comment:
   ```suggestion
           batchScanner.forEach((k, v) -> rows.add(k.getRow().toString()));
   ```



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +402,23 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Optional<Semaphore> getSemaphore() {
+    int configuredWriteThreadsMax =
+        getServerConfig().getConfiguration().getCount(Property.TSERV_WRITE_THREADS_MAX);
+    if (configuredWriteThreadsMax == MAX_WRITE_THREADS_DEFAULT) {
+      return Optional.empty();
+    } else {
+      synchronized (this) {
+        // if the value has changed, create a new semaphore
+        if (maxThreadPermits != configuredWriteThreadsMax) {
+          maxThreadPermits = configuredWriteThreadsMax;
+          writeThreadSemaphore = Optional.of(new Semaphore(maxThreadPermits));

Review Comment:
   When this configuration changes, the old Semaphore could still have write threads in use. Replacing it means that the threads could exceed the limit (possibly by a lot) during this overlap.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +402,23 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Optional<Semaphore> getSemaphore() {
+    int configuredWriteThreadsMax =
+        getServerConfig().getConfiguration().getCount(Property.TSERV_WRITE_THREADS_MAX);
+    if (configuredWriteThreadsMax == MAX_WRITE_THREADS_DEFAULT) {
+      return Optional.empty();

Review Comment:
   I don't understand the purpose of this variable. We don't want to return empty if it's equal to some arbitrary constant... only if it's `0`. I don't think we need this `MAX_WRITE_THREADS_DEFAULT` variable, or the added utility code used to read the default value.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -377,6 +383,8 @@ public void run() {
     } else {
       authKeyWatcher = null;
     }
+    MAX_WRITE_THREADS_DEFAULT =
+        getServerConfig().getConfiguration().getDefaultCount(Property.TSERV_WRITE_THREADS_MAX);

Review Comment:
   I don't understand setting this. What's the difference between this new method you added and `DefaultConfiguration.getInstance().getCount(Property.TSERV_WRITE_THREADS_MAX)`?



##########
test/src/main/java/org/apache/accumulo/test/functional/WriteThreadsIT.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.TSERV_WRITE_THREADS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WriteThreadsIT extends AccumuloClusterHarness {
+
+  ThreadPoolExecutor tpe;
+  private static final Logger log = LoggerFactory.getLogger(WriteThreadsIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // sets the thread limit on the SERVER SIDE
+    // default value is 0. when set to 0, there is no limit
+    cfg.setProperty(TSERV_WRITE_THREADS_MAX.getKey(), "10");
+  }
+
+  @Test
+  public void test() throws Exception {
+    write();
+  }
+
+  public void write() throws Exception {
+    // each thread creates a batch writer, adds mutations, and then flushes.
+    final int threadCount = 100;
+
+    // Reads and writes from Accumulo
+    BatchWriterConfig config = new BatchWriterConfig();
+    config.setMaxWriteThreads(1_000); // this is the max write threads on the CLIENT SIDE
+
+    try (AccumuloClient client =
+        Accumulo.newClient().from(getClientProps()).batchWriterConfig(config).build()) {
+
+      tpe = new ThreadPoolExecutor(threadCount, threadCount, 0, TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(threadCount));
+
+      final String[] tables = getUniqueNames(threadCount);
+
+      for (int i = 0; i < threadCount; i++) {
+        if (i % 10 == 0)
+          log.debug("iteration: {}", i);
+
+        final String tableName = tables[i];
+        client.tableOperations().create(tableName);
+
+        Runnable r = () -> {
+          try (BatchWriter writer = client.createBatchWriter(tableName)) {
+
+            // Data is written to mutation objects
+            List<Mutation> mutations = Stream.of("row1", "row2", "row3", "row4", "row5")
+                .map(Mutation::new).collect(Collectors.toList());
+
+            for (Mutation mutation : mutations) {
+              mutation.at().family("myColFam").qualifier("myColQual").put("myValue1");
+              mutation.at().family("myColFam").qualifier("myColQual").put("myValue2");
+              writer.addMutation(mutation);
+            }
+
+            // Sends buffered mutation to Accumulo immediately (write executed)
+            writer.flush();
+          } catch (Exception e) {
+            log.error("ERROR WRITING MUTATION", e);
+          }
+        };
+        // Runnable above is executed
+        tpe.execute(r);
+      }
+
+      tpe.shutdown();
+      assertTrue(tpe.awaitTermination(90, TimeUnit.SECONDS));
+
+      validateData(client, threadCount, tables);
+    }
+  }
+
+  private void validateData(AccumuloClient client, int threadCount, String[] tables)
+      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    for (int i = 0; i < threadCount; i++) {
+      try (var batchScanner = client.createBatchScanner(tables[i])) {
+        batchScanner.setRanges(List.of(new Range()));
+
+        List<String> rows = new ArrayList<>();
+        for (var e : batchScanner) {
+          rows.add(e.getKey().getRow().toString());
+        }
+        assertEquals(5, rows.size(), "Wrong number of rows returned.");
+        assertTrue(rows.containsAll(List.of("row1", "row2", "row3", "row4", "row5")));

Review Comment:
   I think assertEquals might make more sense than checking the size and then contains.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org