You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by "keith-turner (via GitHub)" <gi...@apache.org> on 2023/05/24 21:33:24 UTC

[GitHub] [accumulo] keith-turner opened a new pull request, #3425: executes user initiated splits in manager

keith-turner opened a new pull request, #3425:
URL: https://github.com/apache/accumulo/pull/3425

   Modifies the user API for adding splits to execute a fate operation instead of calling the tablet server.  Now user initiated splits can happen without having having to host a tablet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3425: executes user initiated splits in manager

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3425:
URL: https://github.com/apache/accumulo/pull/3425#discussion_r1205804991


##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -706,6 +706,10 @@ TabletGoalState getGoalState(TabletMetadata tm, MergeInfo mergeInfo) {
         return TabletGoalState.UNASSIGNED;
       }
 
+      if (tm.getOperationId() != null) {

Review Comment:
   done in e2f4500 



##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java:
##########
@@ -217,10 +218,16 @@ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
         .collect(Collectors.summarizingLong(Long::longValue)).getSum() > splitThreshold;
   }
 
-  private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+  private boolean shouldReturnDueToLocation(final TabletMetadata tm,
       final Set<TableId> onlineTables, final Set<TServerInstance> current, final boolean debug) {
+
+    if (migrations.contains(tm.getExtent())) {
+      return true;
+    }
+
     // is the table supposed to be online or offline?
-    final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+    final boolean shouldBeOnline =
+        onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null;

Review Comment:
   done in e2f4500



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on pull request #3425: executes user initiated splits in manager

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on PR #3425:
URL: https://github.com/apache/accumulo/pull/3425#issuecomment-1562096726

   @dlmarion I made a change in this PR in facbdff to make things in the migrating set return NEEDS_LOCATION_UPDATE in the new TabletManagementIterator.  It was ignoring things that were migrating in the manager.  Figured things that are migrating only need a location update so decided to have it return that.  I ran into this while doing some split testing.
   
   I also addressed #3408 in facbdff.  It was super easy to do after the changes in #3409, it was only a few lines. So I went ahead and threw it since I was running split tests.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3425: executes user initiated splits in manager

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3425:
URL: https://github.com/apache/accumulo/pull/3425#discussion_r1205727510


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java:
##########
@@ -435,198 +432,143 @@ String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,Strin
     }
   }
 
-  private static class SplitEnv {
-    private final String tableName;
-    private final TableId tableId;
-    private final ExecutorService executor;
-    private final CountDownLatch latch;
-    private final AtomicReference<Exception> exception;
-
-    SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch,
-        AtomicReference<Exception> exception) {
-      this.tableName = tableName;
-      this.tableId = tableId;
-      this.executor = executor;
-      this.latch = latch;
-      this.exception = exception;
-    }
-  }
-
-  private class SplitTask implements Runnable {
-
-    private List<Text> splits;
-    private SplitEnv env;
-
-    SplitTask(SplitEnv env, List<Text> splits) {
-      this.env = env;
-      this.splits = splits;
-    }
+  /**
+   * On the server side the fate operation will exit w/o an error if the tablet requested to split
+   * does not exist. When this happens it will also return an empty string. In the case where the
+   * fate operation successfully splits the tablet it will return the following string. This code
+   * uses this return value to see if it needs to retry finding the tablet.
+   */
+  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
 
-    @Override
-    public void run() {
-      try {
-        if (env.exception.get() != null) {
-          return;
-        }
+  @Override
+  public void addSplits(String tableName, SortedSet<Text> splits)
+      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 
-        if (splits.size() <= 2) {
-          addSplits(env, new TreeSet<>(splits));
-          splits.forEach(s -> env.latch.countDown());
-          return;
-        }
+    EXISTING_TABLE_NAME.validate(tableName);
 
-        int mid = splits.size() / 2;
+    TableId tableId = context.getTableId(tableName);
 
-        // split the middle split point to ensure that child task split
-        // different tablets and can therefore run in parallel
-        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
-        env.latch.countDown();
+    // TODO should there be a server side check for this?
+    context.requireNotOffline(tableId, tableName);
 
-        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
-        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size())));
+    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId);
 
-      } catch (Exception t) {
-        env.exception.compareAndSet(null, t);
-      }
-    }
+    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
+    try {
+      while (!splitsTodo.isEmpty()) {
 
-  }
+        tabLocator.invalidateCache();
 
-  @Override
-  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
-      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    EXISTING_TABLE_NAME.validate(tableName);
+        Map<KeyExtent,List<Text>> tabletSplits =
+            mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
 
-    TableId tableId = context.getTableId(tableName);
-    List<Text> splits = new ArrayList<>(partitionKeys);
+        List<Future<List<Text>>> splitTasks = new ArrayList<>();
 
-    // should be sorted because we copied from a sorted set, but that makes
-    // assumptions about how the copy was done so resort to be sure.
-    Collections.sort(splits);
-    CountDownLatch latch = new CountDownLatch(splits.size());
-    AtomicReference<Exception> exception = new AtomicReference<>(null);
+        for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) {
+          Callable<List<Text>> splitTask = createSplitTask(tableName, splitsForTablet);
+          splitTasks.add(executor.submit(splitTask));
+        }
 
-    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
-    try {
-      executor.execute(
-          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
-
-      while (!latch.await(100, MILLISECONDS)) {
-        if (exception.get() != null) {
-          executor.shutdownNow();
-          Throwable excep = exception.get();
-          // Below all exceptions are wrapped and rethrown. This is done so that the user knows what
-          // code path got them here. If the wrapping was not done, the
-          // user would only have the stack trace for the background thread.
-          if (excep instanceof TableNotFoundException) {
-            TableNotFoundException tnfe = (TableNotFoundException) excep;
-            throw new TableNotFoundException(tableId.canonical(), tableName,
-                "Table not found by background thread", tnfe);
-          } else if (excep instanceof TableOfflineException) {
-            log.debug("TableOfflineException occurred in background thread. Throwing new exception",
-                excep);
-            throw new TableOfflineException(tableId, tableName);
-          } else if (excep instanceof AccumuloSecurityException) {
-            // base == background accumulo security exception
-            AccumuloSecurityException base = (AccumuloSecurityException) excep;
-            throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
-                base.getTableInfo(), excep);
-          } else if (excep instanceof AccumuloServerException) {
-            throw new AccumuloServerException((AccumuloServerException) excep);
-          } else if (excep instanceof Error) {
-            throw new Error(excep);
-          } else {
-            throw new AccumuloException(excep);
+        for (var future : splitTasks) {
+          try {
+            var completedSplits = future.get();
+            completedSplits.forEach(splitsTodo::remove);
+          } catch (ExecutionException ee) {
+            Throwable excep = ee.getCause();
+            // Below all exceptions are wrapped and rethrown. This is done so that the user knows
+            // what
+            // code path got them here. If the wrapping was not done, the user would only have the
+            // stack trace for the background thread.
+            if (excep instanceof TableNotFoundException) {
+              TableNotFoundException tnfe = (TableNotFoundException) excep;
+              throw new TableNotFoundException(tableId.canonical(), tableName,
+                  "Table not found by background thread", tnfe);
+            } else if (excep instanceof TableOfflineException) {
+              log.debug(
+                  "TableOfflineException occurred in background thread. Throwing new exception",
+                  excep);
+              throw new TableOfflineException(tableId, tableName);
+            } else if (excep instanceof AccumuloSecurityException) {
+              // base == background accumulo security exception
+              AccumuloSecurityException base = (AccumuloSecurityException) excep;
+              throw new AccumuloSecurityException(base.getUser(),
+                  base.asThriftException().getCode(), base.getTableInfo(), excep);
+            } else if (excep instanceof AccumuloServerException) {
+              throw new AccumuloServerException((AccumuloServerException) excep);
+            } else {
+              throw new AccumuloException(excep);
+            }
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
           }
         }
       }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(e);
     } finally {
-      executor.shutdown();
+      executor.shutdownNow();
     }
   }
 
-  private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys)
-      throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
-      AccumuloServerException, InvalidTabletHostingRequestException {
-
-    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, env.tableId);
-    for (Text split : partitionKeys) {
-      boolean successful = false;
-      int attempt = 0;
-      long locationFailures = 0;
+  private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,
+      ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
+      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
 
-      while (!successful) {
+    var iterator = splitsTodo.iterator();
+    while (iterator.hasNext()) {
+      var split = iterator.next();
 
-        if (attempt > 0) {
-          sleepUninterruptibly(100, MILLISECONDS);
+      try {
+        var tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED);
+        if (tablet == null) {
+          context.requireTableExists(tableId, tableName);

Review Comment:
   Yeah, the loop handles concurrent deletes, split, and merge.  For delete it bails, for split and merge it will try to find the new tablet to split.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] dlmarion commented on a diff in pull request #3425: executes user initiated splits in manager

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3425:
URL: https://github.com/apache/accumulo/pull/3425#discussion_r1205362670


##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java:
##########
@@ -217,10 +218,16 @@ private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
         .collect(Collectors.summarizingLong(Long::longValue)).getSum() > splitThreshold;
   }
 
-  private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+  private boolean shouldReturnDueToLocation(final TabletMetadata tm,
       final Set<TableId> onlineTables, final Set<TServerInstance> current, final boolean debug) {
+
+    if (migrations.contains(tm.getExtent())) {
+      return true;
+    }
+
     // is the table supposed to be online or offline?
-    final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+    final boolean shouldBeOnline =
+        onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null;

Review Comment:
   You should be able to modify TabletManagerIteratorIT to insert an OPID column into the tablet metadata and check that the Tablet gets returned by the iterator.



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -706,6 +706,10 @@ TabletGoalState getGoalState(TabletMetadata tm, MergeInfo mergeInfo) {
         return TabletGoalState.UNASSIGNED;
       }
 
+      if (tm.getOperationId() != null) {

Review Comment:
   Might be able to modify ManagerAssignmentIT.test to insert an OPID into tablet metadata and test this



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java:
##########
@@ -435,198 +432,143 @@ String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,Strin
     }
   }
 
-  private static class SplitEnv {
-    private final String tableName;
-    private final TableId tableId;
-    private final ExecutorService executor;
-    private final CountDownLatch latch;
-    private final AtomicReference<Exception> exception;
-
-    SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch,
-        AtomicReference<Exception> exception) {
-      this.tableName = tableName;
-      this.tableId = tableId;
-      this.executor = executor;
-      this.latch = latch;
-      this.exception = exception;
-    }
-  }
-
-  private class SplitTask implements Runnable {
-
-    private List<Text> splits;
-    private SplitEnv env;
-
-    SplitTask(SplitEnv env, List<Text> splits) {
-      this.env = env;
-      this.splits = splits;
-    }
+  /**
+   * On the server side the fate operation will exit w/o an error if the tablet requested to split
+   * does not exist. When this happens it will also return an empty string. In the case where the
+   * fate operation successfully splits the tablet it will return the following string. This code
+   * uses this return value to see if it needs to retry finding the tablet.
+   */
+  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
 
-    @Override
-    public void run() {
-      try {
-        if (env.exception.get() != null) {
-          return;
-        }
+  @Override
+  public void addSplits(String tableName, SortedSet<Text> splits)
+      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 
-        if (splits.size() <= 2) {
-          addSplits(env, new TreeSet<>(splits));
-          splits.forEach(s -> env.latch.countDown());
-          return;
-        }
+    EXISTING_TABLE_NAME.validate(tableName);
 
-        int mid = splits.size() / 2;
+    TableId tableId = context.getTableId(tableName);
 
-        // split the middle split point to ensure that child task split
-        // different tablets and can therefore run in parallel
-        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
-        env.latch.countDown();
+    // TODO should there be a server side check for this?
+    context.requireNotOffline(tableId, tableName);
 
-        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
-        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size())));
+    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId);
 
-      } catch (Exception t) {
-        env.exception.compareAndSet(null, t);
-      }
-    }
+    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
+    try {
+      while (!splitsTodo.isEmpty()) {
 
-  }
+        tabLocator.invalidateCache();
 
-  @Override
-  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
-      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    EXISTING_TABLE_NAME.validate(tableName);
+        Map<KeyExtent,List<Text>> tabletSplits =
+            mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
 
-    TableId tableId = context.getTableId(tableName);
-    List<Text> splits = new ArrayList<>(partitionKeys);
+        List<Future<List<Text>>> splitTasks = new ArrayList<>();
 
-    // should be sorted because we copied from a sorted set, but that makes
-    // assumptions about how the copy was done so resort to be sure.
-    Collections.sort(splits);
-    CountDownLatch latch = new CountDownLatch(splits.size());
-    AtomicReference<Exception> exception = new AtomicReference<>(null);
+        for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) {
+          Callable<List<Text>> splitTask = createSplitTask(tableName, splitsForTablet);
+          splitTasks.add(executor.submit(splitTask));
+        }
 
-    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
-    try {
-      executor.execute(
-          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
-
-      while (!latch.await(100, MILLISECONDS)) {
-        if (exception.get() != null) {
-          executor.shutdownNow();
-          Throwable excep = exception.get();
-          // Below all exceptions are wrapped and rethrown. This is done so that the user knows what
-          // code path got them here. If the wrapping was not done, the
-          // user would only have the stack trace for the background thread.
-          if (excep instanceof TableNotFoundException) {
-            TableNotFoundException tnfe = (TableNotFoundException) excep;
-            throw new TableNotFoundException(tableId.canonical(), tableName,
-                "Table not found by background thread", tnfe);
-          } else if (excep instanceof TableOfflineException) {
-            log.debug("TableOfflineException occurred in background thread. Throwing new exception",
-                excep);
-            throw new TableOfflineException(tableId, tableName);
-          } else if (excep instanceof AccumuloSecurityException) {
-            // base == background accumulo security exception
-            AccumuloSecurityException base = (AccumuloSecurityException) excep;
-            throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
-                base.getTableInfo(), excep);
-          } else if (excep instanceof AccumuloServerException) {
-            throw new AccumuloServerException((AccumuloServerException) excep);
-          } else if (excep instanceof Error) {
-            throw new Error(excep);
-          } else {
-            throw new AccumuloException(excep);
+        for (var future : splitTasks) {
+          try {
+            var completedSplits = future.get();
+            completedSplits.forEach(splitsTodo::remove);
+          } catch (ExecutionException ee) {
+            Throwable excep = ee.getCause();
+            // Below all exceptions are wrapped and rethrown. This is done so that the user knows
+            // what
+            // code path got them here. If the wrapping was not done, the user would only have the
+            // stack trace for the background thread.
+            if (excep instanceof TableNotFoundException) {
+              TableNotFoundException tnfe = (TableNotFoundException) excep;
+              throw new TableNotFoundException(tableId.canonical(), tableName,
+                  "Table not found by background thread", tnfe);
+            } else if (excep instanceof TableOfflineException) {
+              log.debug(
+                  "TableOfflineException occurred in background thread. Throwing new exception",
+                  excep);
+              throw new TableOfflineException(tableId, tableName);
+            } else if (excep instanceof AccumuloSecurityException) {
+              // base == background accumulo security exception
+              AccumuloSecurityException base = (AccumuloSecurityException) excep;
+              throw new AccumuloSecurityException(base.getUser(),
+                  base.asThriftException().getCode(), base.getTableInfo(), excep);
+            } else if (excep instanceof AccumuloServerException) {
+              throw new AccumuloServerException((AccumuloServerException) excep);
+            } else {
+              throw new AccumuloException(excep);
+            }
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
           }
         }
       }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(e);
     } finally {
-      executor.shutdown();
+      executor.shutdownNow();
     }
   }
 
-  private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys)
-      throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
-      AccumuloServerException, InvalidTabletHostingRequestException {
-
-    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, env.tableId);
-    for (Text split : partitionKeys) {
-      boolean successful = false;
-      int attempt = 0;
-      long locationFailures = 0;
+  private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,
+      ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
+      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
 
-      while (!successful) {
+    var iterator = splitsTodo.iterator();
+    while (iterator.hasNext()) {
+      var split = iterator.next();
 
-        if (attempt > 0) {
-          sleepUninterruptibly(100, MILLISECONDS);
+      try {
+        var tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED);
+        if (tablet == null) {
+          context.requireTableExists(tableId, tableName);

Review Comment:
   Are you checking this inside the loop in case the table is deleted by something else concurrently?



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java:
##########
@@ -435,198 +432,143 @@ String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,Strin
     }
   }
 
-  private static class SplitEnv {
-    private final String tableName;
-    private final TableId tableId;
-    private final ExecutorService executor;
-    private final CountDownLatch latch;
-    private final AtomicReference<Exception> exception;
-
-    SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch,
-        AtomicReference<Exception> exception) {
-      this.tableName = tableName;
-      this.tableId = tableId;
-      this.executor = executor;
-      this.latch = latch;
-      this.exception = exception;
-    }
-  }
-
-  private class SplitTask implements Runnable {
-
-    private List<Text> splits;
-    private SplitEnv env;
-
-    SplitTask(SplitEnv env, List<Text> splits) {
-      this.env = env;
-      this.splits = splits;
-    }
+  /**
+   * On the server side the fate operation will exit w/o an error if the tablet requested to split
+   * does not exist. When this happens it will also return an empty string. In the case where the
+   * fate operation successfully splits the tablet it will return the following string. This code
+   * uses this return value to see if it needs to retry finding the tablet.
+   */
+  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
 
-    @Override
-    public void run() {
-      try {
-        if (env.exception.get() != null) {
-          return;
-        }
+  @Override
+  public void addSplits(String tableName, SortedSet<Text> splits)
+      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 
-        if (splits.size() <= 2) {
-          addSplits(env, new TreeSet<>(splits));
-          splits.forEach(s -> env.latch.countDown());
-          return;
-        }
+    EXISTING_TABLE_NAME.validate(tableName);
 
-        int mid = splits.size() / 2;
+    TableId tableId = context.getTableId(tableName);
 
-        // split the middle split point to ensure that child task split
-        // different tablets and can therefore run in parallel
-        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
-        env.latch.countDown();
+    // TODO should there be a server side check for this?
+    context.requireNotOffline(tableId, tableName);
 
-        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
-        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size())));
+    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId);
 
-      } catch (Exception t) {
-        env.exception.compareAndSet(null, t);
-      }
-    }
+    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
+    try {
+      while (!splitsTodo.isEmpty()) {
 
-  }
+        tabLocator.invalidateCache();
 
-  @Override
-  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
-      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    EXISTING_TABLE_NAME.validate(tableName);
+        Map<KeyExtent,List<Text>> tabletSplits =
+            mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
 
-    TableId tableId = context.getTableId(tableName);
-    List<Text> splits = new ArrayList<>(partitionKeys);
+        List<Future<List<Text>>> splitTasks = new ArrayList<>();
 
-    // should be sorted because we copied from a sorted set, but that makes
-    // assumptions about how the copy was done so resort to be sure.
-    Collections.sort(splits);
-    CountDownLatch latch = new CountDownLatch(splits.size());
-    AtomicReference<Exception> exception = new AtomicReference<>(null);
+        for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) {
+          Callable<List<Text>> splitTask = createSplitTask(tableName, splitsForTablet);
+          splitTasks.add(executor.submit(splitTask));
+        }
 
-    ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
-    try {
-      executor.execute(
-          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
-
-      while (!latch.await(100, MILLISECONDS)) {
-        if (exception.get() != null) {
-          executor.shutdownNow();
-          Throwable excep = exception.get();
-          // Below all exceptions are wrapped and rethrown. This is done so that the user knows what
-          // code path got them here. If the wrapping was not done, the
-          // user would only have the stack trace for the background thread.
-          if (excep instanceof TableNotFoundException) {
-            TableNotFoundException tnfe = (TableNotFoundException) excep;
-            throw new TableNotFoundException(tableId.canonical(), tableName,
-                "Table not found by background thread", tnfe);
-          } else if (excep instanceof TableOfflineException) {
-            log.debug("TableOfflineException occurred in background thread. Throwing new exception",
-                excep);
-            throw new TableOfflineException(tableId, tableName);
-          } else if (excep instanceof AccumuloSecurityException) {
-            // base == background accumulo security exception
-            AccumuloSecurityException base = (AccumuloSecurityException) excep;
-            throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
-                base.getTableInfo(), excep);
-          } else if (excep instanceof AccumuloServerException) {
-            throw new AccumuloServerException((AccumuloServerException) excep);
-          } else if (excep instanceof Error) {
-            throw new Error(excep);
-          } else {
-            throw new AccumuloException(excep);
+        for (var future : splitTasks) {
+          try {
+            var completedSplits = future.get();
+            completedSplits.forEach(splitsTodo::remove);
+          } catch (ExecutionException ee) {
+            Throwable excep = ee.getCause();
+            // Below all exceptions are wrapped and rethrown. This is done so that the user knows
+            // what
+            // code path got them here. If the wrapping was not done, the user would only have the
+            // stack trace for the background thread.
+            if (excep instanceof TableNotFoundException) {
+              TableNotFoundException tnfe = (TableNotFoundException) excep;
+              throw new TableNotFoundException(tableId.canonical(), tableName,
+                  "Table not found by background thread", tnfe);
+            } else if (excep instanceof TableOfflineException) {
+              log.debug(
+                  "TableOfflineException occurred in background thread. Throwing new exception",
+                  excep);
+              throw new TableOfflineException(tableId, tableName);
+            } else if (excep instanceof AccumuloSecurityException) {
+              // base == background accumulo security exception
+              AccumuloSecurityException base = (AccumuloSecurityException) excep;
+              throw new AccumuloSecurityException(base.getUser(),
+                  base.asThriftException().getCode(), base.getTableInfo(), excep);
+            } else if (excep instanceof AccumuloServerException) {
+              throw new AccumuloServerException((AccumuloServerException) excep);
+            } else {
+              throw new AccumuloException(excep);
+            }
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
           }
         }
       }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(e);
     } finally {
-      executor.shutdown();
+      executor.shutdownNow();
     }
   }
 
-  private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys)
-      throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
-      AccumuloServerException, InvalidTabletHostingRequestException {
-
-    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, env.tableId);
-    for (Text split : partitionKeys) {
-      boolean successful = false;
-      int attempt = 0;
-      long locationFailures = 0;
+  private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,
+      ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
+      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
 
-      while (!successful) {
+    var iterator = splitsTodo.iterator();
+    while (iterator.hasNext()) {
+      var split = iterator.next();
 
-        if (attempt > 0) {
-          sleepUninterruptibly(100, MILLISECONDS);
+      try {
+        var tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED);
+        if (tablet == null) {
+          context.requireTableExists(tableId, tableName);
+          throw new IllegalStateException("Unable to find a tablet for split " + split
+              + " int table " + tableName + " " + tableId);

Review Comment:
   ```suggestion
                 + " in table " + tableName + " " + tableId);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner merged pull request #3425: executes user initiated splits in manager

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner merged PR #3425:
URL: https://github.com/apache/accumulo/pull/3425


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org