You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by "keith-turner (via GitHub)" <gi...@apache.org> on 2023/05/25 15:58:44 UTC

[GitHub] [accumulo] keith-turner commented on a diff in pull request #3425: executes user initiated splits in manager

keith-turner commented on code in PR #3425:
URL: https://github.com/apache/accumulo/pull/3425#discussion_r1205727510


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java:
##########
@@ -435,198 +432,143 @@ String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,Strin
     }
   }
 
-  private static class SplitEnv {
-    private final String tableName;
-    private final TableId tableId;
-    private final ExecutorService executor;
-    private final CountDownLatch latch;
-    private final AtomicReference<Exception> exception;
-
-    SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch,
-        AtomicReference<Exception> exception) {
-      this.tableName = tableName;
-      this.tableId = tableId;
-      this.executor = executor;
-      this.latch = latch;
-      this.exception = exception;
-    }
-  }
-
-  private class SplitTask implements Runnable {
-
-    private List<Text> splits;
-    private SplitEnv env;
-
-    SplitTask(SplitEnv env, List<Text> splits) {
-      this.env = env;
-      this.splits = splits;
-    }
+  /**
+   * On the server side the fate operation will exit w/o an error if the tablet requested to split
+   * does not exist. When this happens it will also return an empty string. In the case where the
+   * fate operation successfully splits the tablet it will return the following string. This code
+   * uses this return value to see if it needs to retry finding the tablet.
+   */
+  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
 
-    @Override
-    public void run() {
-      try {
-        if (env.exception.get() != null) {
-          return;
-        }
+  @Override
+  public void addSplits(String tableName, SortedSet<Text> splits)
+      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 
-        if (splits.size() <= 2) {
-          addSplits(env, new TreeSet<>(splits));
-          splits.forEach(s -> env.latch.countDown());
-          return;
-        }
+    EXISTING_TABLE_NAME.validate(tableName);
 
-        int mid = splits.size() / 2;
+    TableId tableId = context.getTableId(tableName);
 
-        // split the middle split point to ensure that child task split
-        // different tablets and can therefore run in parallel
-        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
-        env.latch.countDown();
+    // TODO should there be a server side check for this?
+    context.requireNotOffline(tableId, tableName);
 
-        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
-        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size())));
+    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId);
 
-      } catch (Exception t) {
-        env.exception.compareAndSet(null, t);
-      }
-    }
+    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
+    try {
+      while (!splitsTodo.isEmpty()) {
 
-  }
+        tabLocator.invalidateCache();
 
-  @Override
-  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
-      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    EXISTING_TABLE_NAME.validate(tableName);
+        Map<KeyExtent,List<Text>> tabletSplits =
+            mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
 
-    TableId tableId = context.getTableId(tableName);
-    List<Text> splits = new ArrayList<>(partitionKeys);
+        List<Future<List<Text>>> splitTasks = new ArrayList<>();
 
-    // should be sorted because we copied from a sorted set, but that makes
-    // assumptions about how the copy was done so resort to be sure.
-    Collections.sort(splits);
-    CountDownLatch latch = new CountDownLatch(splits.size());
-    AtomicReference<Exception> exception = new AtomicReference<>(null);
+        for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) {
+          Callable<List<Text>> splitTask = createSplitTask(tableName, splitsForTablet);
+          splitTasks.add(executor.submit(splitTask));
+        }
 
-    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
-    try {
-      executor.execute(
-          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
-
-      while (!latch.await(100, MILLISECONDS)) {
-        if (exception.get() != null) {
-          executor.shutdownNow();
-          Throwable excep = exception.get();
-          // Below all exceptions are wrapped and rethrown. This is done so that the user knows what
-          // code path got them here. If the wrapping was not done, the
-          // user would only have the stack trace for the background thread.
-          if (excep instanceof TableNotFoundException) {
-            TableNotFoundException tnfe = (TableNotFoundException) excep;
-            throw new TableNotFoundException(tableId.canonical(), tableName,
-                "Table not found by background thread", tnfe);
-          } else if (excep instanceof TableOfflineException) {
-            log.debug("TableOfflineException occurred in background thread. Throwing new exception",
-                excep);
-            throw new TableOfflineException(tableId, tableName);
-          } else if (excep instanceof AccumuloSecurityException) {
-            // base == background accumulo security exception
-            AccumuloSecurityException base = (AccumuloSecurityException) excep;
-            throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
-                base.getTableInfo(), excep);
-          } else if (excep instanceof AccumuloServerException) {
-            throw new AccumuloServerException((AccumuloServerException) excep);
-          } else if (excep instanceof Error) {
-            throw new Error(excep);
-          } else {
-            throw new AccumuloException(excep);
+        for (var future : splitTasks) {
+          try {
+            var completedSplits = future.get();
+            completedSplits.forEach(splitsTodo::remove);
+          } catch (ExecutionException ee) {
+            Throwable excep = ee.getCause();
+            // Below all exceptions are wrapped and rethrown. This is done so that the user knows
+            // what
+            // code path got them here. If the wrapping was not done, the user would only have the
+            // stack trace for the background thread.
+            if (excep instanceof TableNotFoundException) {
+              TableNotFoundException tnfe = (TableNotFoundException) excep;
+              throw new TableNotFoundException(tableId.canonical(), tableName,
+                  "Table not found by background thread", tnfe);
+            } else if (excep instanceof TableOfflineException) {
+              log.debug(
+                  "TableOfflineException occurred in background thread. Throwing new exception",
+                  excep);
+              throw new TableOfflineException(tableId, tableName);
+            } else if (excep instanceof AccumuloSecurityException) {
+              // base == background accumulo security exception
+              AccumuloSecurityException base = (AccumuloSecurityException) excep;
+              throw new AccumuloSecurityException(base.getUser(),
+                  base.asThriftException().getCode(), base.getTableInfo(), excep);
+            } else if (excep instanceof AccumuloServerException) {
+              throw new AccumuloServerException((AccumuloServerException) excep);
+            } else {
+              throw new AccumuloException(excep);
+            }
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
           }
         }
       }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(e);
     } finally {
-      executor.shutdown();
+      executor.shutdownNow();
     }
   }
 
-  private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys)
-      throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
-      AccumuloServerException, InvalidTabletHostingRequestException {
-
-    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, env.tableId);
-    for (Text split : partitionKeys) {
-      boolean successful = false;
-      int attempt = 0;
-      long locationFailures = 0;
+  private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,
+      ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
+      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
 
-      while (!successful) {
+    var iterator = splitsTodo.iterator();
+    while (iterator.hasNext()) {
+      var split = iterator.next();
 
-        if (attempt > 0) {
-          sleepUninterruptibly(100, MILLISECONDS);
+      try {
+        var tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED);
+        if (tablet == null) {
+          context.requireTableExists(tableId, tableName);

Review Comment:
   Yeah, the loop handles concurrent deletes, split, and merge.  For delete it bails, for split and merge it will try to find the new tablet to split.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org