You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/05/18 18:47:14 UTC

[accumulo] branch elasticity updated: Moved automatic splits to the manager (#3382)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 808c0bb5cc Moved automatic splits to the manager (#3382)
808c0bb5cc is described below

commit 808c0bb5cc28511da6b50676a34f1abf36b7d20c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu May 18 14:47:08 2023 -0400

    Moved automatic splits to the manager (#3382)
    
    The Manager now scans tablets looking for ones that need to split.  When
    it finds one it queues it on a thread pool for inspection of files to
    find a split point.  When split points are found a FATE operation is
    initiated to do the splits.  The FATE operation uses conditional
    mutations to synchronize access to the tablet metadata.  The FATE
    operation creates one or more new tablets in the metadata table. Unlike
    the the old split code which could only split a tablet into two, this
    new code can split a tablet into however many are needed in a single
    operation.  This makes responding to a change in the split threshold
    much faster.
    
    The old code used to shorten split points, this change carries that
    behavior forward in the new code to find multiple split points.
    
    This change does not include moving user intiated splits into the
    manager, that will be done in a follow on change.
---
 .../org/apache/accumulo/core/conf/Property.java    |   2 +
 .../accumulo/core/metadata/schema/Ample.java       |   2 +-
 .../core/metadata/schema/DataFileValue.java        |   2 +-
 .../core/metadata/schema/TabletsMetadata.java      |   4 +
 .../accumulo/core/util/threads/ThreadPools.java    |   3 +
 .../server/manager/state/CurrentState.java         |   5 +
 .../server/manager/state/MetaDataTableScanner.java |   8 +-
 .../metadata/ConditionalTabletsMutatorImpl.java    |  19 ++
 .../server/metadata/TabletMutatorBase.java         |   2 +-
 .../server/tablets/UniqueNameAllocator.java        |  11 +
 .../org/apache/accumulo/server/util/FileUtil.java  |  21 +-
 .../java/org/apache/accumulo/manager/Manager.java  |  59 ++++-
 .../apache/accumulo/manager/TabletOperations.java  |  25 --
 .../accumulo/manager/split/SplitScanner.java       |  96 +++++++
 .../apache/accumulo/manager/split/SplitTask.java   |  89 +++++++
 .../apache/accumulo/manager/split/SplitUtils.java  | 292 +++++++++++++++++++++
 .../apache/accumulo/manager/split/Splitter.java    | 222 ++++++++++++++++
 .../manager/tableOps/create/PopulateMetadata.java  |   2 +-
 .../manager/tableOps/goal/SetHostingGoal.java      |   2 +-
 .../manager/tableOps/split/DeleteOperationIds.java |  74 ++++++
 .../accumulo/manager/tableOps/split/PreSplit.java  | 157 +++++++++++
 .../accumulo/manager/tableOps/split/SplitInfo.java |  90 +++++++
 .../manager/tableOps/split/UpdateTablets.java      | 261 ++++++++++++++++++
 .../accumulo/manager/upgrade/Upgrader11to12.java   |   4 +-
 .../accumulo/manager/split/SplitUtilsTest.java     | 230 ++++++++++++++++
 .../accumulo/manager/split/SplitterTest.java       | 101 +++++++
 .../manager/tableOps/split/UpdateTabletsTest.java  | 116 ++++++++
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  21 +-
 .../test/functional/BulkSplitOptimizationIT.java   |   1 +
 .../apache/accumulo/test/functional/SplitIT.java   |   6 +-
 30 files changed, 1869 insertions(+), 58 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 3e74961bf3..de0170c2d4 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -361,6 +361,8 @@ public enum Property {
           + "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available "
           + "threshold is set greater than 0. Added with version 1.10",
       "1.10.0"),
+  MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", "8", PropertyType.COUNT,
+      "The number of threads used to inspect tablets files to find split points.", "4.0.0"),
   // properties that are specific to scan server behavior
   @Experimental
   SSERV_PREFIX("sserver.", null, PropertyType.PREFIX,
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index ad05546c4a..3d03853696 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -339,7 +339,7 @@ public interface Ample {
 
     T deleteExternalCompaction(ExternalCompactionId ecid);
 
-    T setHostingGoal(TabletHostingGoal goal);
+    T putHostingGoal(TabletHostingGoal goal);
 
     T setHostingRequested();
 
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
index fe770d96c0..4f87b3d413 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
@@ -94,7 +94,7 @@ public class DataFileValue {
     if (o instanceof DataFileValue) {
       DataFileValue odfv = (DataFileValue) o;
 
-      return size == odfv.size && numEntries == odfv.numEntries;
+      return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time;
     }
 
     return false;
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 9c57404dd8..11bad3ab49 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -25,6 +25,7 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
 
@@ -343,6 +344,9 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
           case ECOMP:
             families.add(ExternalCompactionColumnFamily.NAME);
             break;
+          case OPID:
+            qualifiers.add(OPID_COLUMN);
+            break;
           default:
             throw new IllegalArgumentException("Unknown col type " + colToFetch);
 
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 585daa1df2..bea6e32de1 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -283,6 +283,9 @@ public class ThreadPools {
             "summary partition", emitThreadPoolMetrics);
       case GC_DELETE_THREADS:
         return createFixedThreadPool(conf.getCount(p), "deleting", emitThreadPoolMetrics);
+      case MANAGER_SPLIT_WORKER_THREADS:
+        return createFixedThreadPool(conf.getCount(p), "tablet split inspection",
+            emitThreadPoolMetrics);
       default:
         throw new RuntimeException("Unhandled thread pool property: " + p);
     }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
index 7fb883efa1..397f9b4d0e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
@@ -42,5 +42,10 @@ public interface CurrentState {
    */
   Set<KeyExtent> migrationsSnapshot();
 
+  // ELASTICITY_TODO this approach to requesting unassignments was a quick hack
+  default Set<KeyExtent> getUnassignmentRequest() {
+    throw new UnsupportedOperationException();
+  }
+
   ManagerState getManagerState();
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java
index d9b2816efb..8406fe4157 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 public class MetaDataTableScanner implements ClosableIterator<TabletLocationState> {
   private static final Logger log = LoggerFactory.getLogger(MetaDataTableScanner.class);
 
@@ -100,7 +102,11 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
       TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
       TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
       TabletStateChangeIterator.setMerges(tabletChange, state.merges());
-      TabletStateChangeIterator.setMigrations(tabletChange, state.migrationsSnapshot());
+      // ELASTICITY_TODO passing the unassignemnt request as part of the migrations is a hack. Was
+      // not sure of the entire unassignment request approach and did not want to push it further
+      // into the code.
+      TabletStateChangeIterator.setMigrations(tabletChange,
+          Sets.union(state.migrationsSnapshot(), state.getUnassignmentRequest()));
       TabletStateChangeIterator.setManagerState(tabletChange, state.getManagerState());
       TabletStateChangeIterator.setShuttingDown(tabletChange, state.shutdownServers());
     }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 9129c22f1c..6e97bbec9e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -36,13 +36,18 @@ import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMutator {
 
+  private static final Logger log = LoggerFactory.getLogger(ConditionalTabletsMutatorImpl.class);
+
   private final ServerContext context;
   private Ample.DataLevel dataLevel = null;
 
@@ -131,7 +136,21 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
           resultsMap.put(extents.get(row), result);
         }
 
+        var extentsSet = Set.copyOf(extents.values());
         if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) {
+          // ELASTICITY_TODO this check can trigger if someone forgets to submit, could check for
+          // that
+
+          Sets.difference(resultsMap.keySet(), extentsSet)
+              .forEach(extent -> log.error("Unexpected extent seen in in result {}", extent));
+
+          Sets.difference(extentsSet, resultsMap.keySet())
+              .forEach(extent -> log.error("Expected extent not seen in result {}", extent));
+
+          resultsMap.forEach((keyExtent, result) -> {
+            log.error("result seen {} {}", keyExtent, new Text(result.getMutation().getRow()));
+          });
+
           throw new AssertionError("Not all extents were seen, this is unexpected");
         }
 
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
index 02bb9b8f62..a19905e3d2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
@@ -272,7 +272,7 @@ public abstract class TabletMutatorBase<T extends Ample.TabletUpdates<T>>
   }
 
   @Override
-  public T setHostingGoal(TabletHostingGoal goal) {
+  public T putHostingGoal(TabletHostingGoal goal) {
     HostingColumnFamily.GOAL_COLUMN.put(mutation, TabletHostingGoalUtil.toValue(goal));
     return getThis();
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
index 03eb9229bf..889467ef11 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
@@ -23,8 +23,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.security.SecureRandom;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.util.FastFormat;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.hadoop.io.Text;
 
 /**
  * Allocates unique names for an accumulo instance. The names are unique for the lifetime of the
@@ -45,6 +47,15 @@ public class UniqueNameAllocator {
     nextNamePath = Constants.ZROOT + "/" + context.getInstanceID() + Constants.ZNEXT_FILE;
   }
 
+  public static String createTabletDirectoryName(ServerContext context, Text endRow) {
+    if (endRow == null) {
+      return MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
+    } else {
+      UniqueNameAllocator namer = context.getUniqueNameAllocator();
+      return Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
+    }
+  }
+
   public synchronized String getNextName() {
 
     while (next >= maxAllocated) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index 3b7d331e29..996316df3c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@ -60,20 +60,20 @@ public class FileUtil {
   private static final SecureRandom random = new SecureRandom();
 
   public static class FileInfo {
-    Key firstKey = new Key();
-    Key lastKey = new Key();
+    private final Text firstKey;
+    private final Text lastKey;
 
     public FileInfo(Key firstKey, Key lastKey) {
-      this.firstKey = firstKey;
-      this.lastKey = lastKey;
+      this.firstKey = firstKey.getRow();
+      this.lastKey = lastKey.getRow();
     }
 
     public Text getFirstRow() {
-      return firstKey.getRow();
+      return firstKey;
     }
 
     public Text getLastRow() {
-      return lastKey.getRow();
+      return lastKey;
     }
   }
 
@@ -518,8 +518,13 @@ public class FileUtil {
 
     long t2 = System.currentTimeMillis();
 
-    log.debug(String.format("Found first and last keys for %d data files in %6.2f secs",
-        dataFiles.size(), (t2 - t1) / 1000.0));
+    String message = String.format("Found first and last keys for %d data files in %6.2f secs",
+        dataFiles.size(), (t2 - t1) / 1000.0);
+    if (t2 - t1 > 500) {
+      log.debug(message);
+    } else {
+      log.trace(message);
+    }
 
     return dataFilesInfo;
   }
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 5ce57ad57e..fbacf5ee59 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -114,6 +114,7 @@ import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.manager.metrics.ManagerMetrics;
 import org.apache.accumulo.manager.recovery.RecoveryManager;
+import org.apache.accumulo.manager.split.Splitter;
 import org.apache.accumulo.manager.state.TableCounts;
 import org.apache.accumulo.manager.tableOps.TraceRepo;
 import org.apache.accumulo.manager.upgrade.PreUpgradeValidation;
@@ -246,7 +247,7 @@ public class Manager extends AbstractServer
    *
    * @return the Fate object, only after the fate components are running and ready
    */
-  Fate<Manager> fate() {
+  public Fate<Manager> fate() {
     try {
       // block up to 30 seconds until it's ready; if it's still not ready, introduce some logging
       if (!fateReadyLatch.await(30, SECONDS)) {
@@ -569,6 +570,52 @@ public class Manager extends AbstractServer
     }
   }
 
+  // ELASTICITY_TODO the following should probably be a cache w/ timeout so that things that are
+  // never removed will age off. Unsure about the approach, so want to hold on off on doing more
+  // work. It was a quick hack added so that splits could get the manager to unassign tablets.
+  private final Map<KeyExtent,Set<Long>> unassignmentRequest =
+      Collections.synchronizedMap(new HashMap<>());
+
+  @Override
+  public Set<KeyExtent> getUnassignmentRequest() {
+    synchronized (unassignmentRequest) {
+      return Set.copyOf(unassignmentRequest.keySet());
+    }
+  }
+
+  public void requestUnassignment(KeyExtent tablet, long fateTxId) {
+    unassignmentRequest.compute(tablet, (k, v) -> {
+      Set<Long> txids = v == null ? new HashSet<>() : v;
+      txids.add(fateTxId);
+      return txids;
+    });
+
+    nextEvent.event("Unassignment requested %s", tablet);
+  }
+
+  public void cancelUnassignmentRequest(KeyExtent tablet, long fateTxid) {
+    unassignmentRequest.computeIfPresent(tablet, (k, v) -> {
+      v.remove(fateTxid);
+      if (v.isEmpty()) {
+        return null;
+      }
+
+      return v;
+    });
+
+    nextEvent.event("Unassignment request canceled %s", tablet);
+  }
+
+  public boolean isUnassignmentRequested(KeyExtent extent) {
+    return unassignmentRequest.containsKey(extent);
+  }
+
+  private Splitter splitter;
+
+  public Splitter getSplitter() {
+    return splitter;
+  }
+
   enum TabletGoalState {
     HOSTED(TUnloadTabletGoal.UNKNOWN),
     UNASSIGNED(TUnloadTabletGoal.UNASSIGNED),
@@ -645,6 +692,7 @@ public class Manager extends AbstractServer
     KeyExtent extent = tls.extent;
     // Shutting down?
     TabletGoalState state = getSystemGoalState(tls);
+
     if (state == TabletGoalState.HOSTED) {
       if (!upgradeCoordinator.getStatus().isParentLevelUpgraded(extent)) {
         // The place where this tablet stores its metadata was not upgraded, so do not assign this
@@ -652,6 +700,10 @@ public class Manager extends AbstractServer
         return TabletGoalState.UNASSIGNED;
       }
 
+      if (unassignmentRequest.containsKey(extent)) {
+        return TabletGoalState.UNASSIGNED;
+      }
+
       if (tls.current != null && serversToShutdown.contains(tls.current.getServerInstance())) {
         return TabletGoalState.SUSPENDED;
       }
@@ -1274,6 +1326,9 @@ public class Manager extends AbstractServer
       throw new IllegalStateException("Exception updating manager lock", e);
     }
 
+    this.splitter = new Splitter(context, DataLevel.USER, this);
+    this.splitter.start();
+
     while (!clientService.isServing()) {
       sleepUninterruptibly(100, MILLISECONDS);
     }
@@ -1287,6 +1342,8 @@ public class Manager extends AbstractServer
     log.info("Shutting down fate.");
     fate().shutdown();
 
+    splitter.stop();
+
     final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
     try {
       statusThread.join(remaining(deadline));
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletOperations.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletOperations.java
deleted file mode 100644
index 1feb9e459c..0000000000
--- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletOperations.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.manager;
-
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-
-public interface TabletOperations {
-  AutoCloseable unassign(KeyExtent tablet);
-}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitScanner.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitScanner.java
new file mode 100644
index 0000000000..0d11e97944
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitScanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.split;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+// ELASTICITY_TODO explore moving this functionality into TabletGroupWatcher (also consider the
+// compaction scan).  Could we do server side filtering to find candidates?  In addition to or
+// independently of moving code to the tablet group watcher, we could sum up files sizes on the
+// tablet server using an accumulo iterator and return on the sum.
+public class SplitScanner implements Runnable {
+
+  private static final Logger log = LoggerFactory.getLogger(SplitScanner.class);
+
+  private final Ample.DataLevel level;
+  private final ServerContext context;
+
+  private final Manager manager;
+
+  public SplitScanner(ServerContext context, Ample.DataLevel level, Manager manager) {
+    Preconditions.checkArgument(level != Ample.DataLevel.ROOT);
+    this.context = context;
+    this.level = level;
+    this.manager = manager;
+  }
+
+  @Override
+  public void run() {
+    var tablets = context.getAmple().readTablets().forLevel(level)
+        .fetch(ColumnType.FILES, ColumnType.OPID, ColumnType.PREV_ROW, ColumnType.LOCATION).build();
+
+    TableId lastTableId = null;
+    long threshold = Long.MAX_VALUE;
+
+    boolean isOnline = true;
+
+    for (TabletMetadata tablet : tablets) {
+      if (tablet.getOperationId() != null || !manager.getSplitter().shouldInspect(tablet)) {
+        log.debug("ignoring for split inspection {} {} {}", tablet.getExtent(),
+            tablet.getOperationId(), !manager.getSplitter().shouldInspect(tablet));
+        continue;
+      }
+
+      if (lastTableId == null || !lastTableId.equals(tablet.getTableId())) {
+        threshold = context.getTableConfiguration(tablet.getTableId())
+            .getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
+        lastTableId = tablet.getTableId();
+
+        isOnline = context.getTableState(lastTableId) == TableState.ONLINE;
+      }
+
+      if (!isOnline) {
+        continue;
+      }
+
+      var tabletSize =
+          tablet.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
+
+      if (tabletSize > threshold) {
+        if (manager.getSplitter().addSplitStarting(tablet.getExtent())) {
+          log.debug("submitting for split tablet:{} size:{} threshold:{}", tablet.getExtent(),
+              tabletSize, threshold);
+          manager.getSplitter().executeSplit(new SplitTask(context, tablet, manager));
+        }
+      }
+    }
+  }
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java
new file mode 100644
index 0000000000..19ab1232ec
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.split;
+
+import java.time.Duration;
+import java.util.SortedSet;
+
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.split.PreSplit;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SplitTask implements Runnable {
+
+  private static final Logger log = LoggerFactory.getLogger(SplitTask.class);
+  private final Manager manager;
+
+  private final ServerContext context;
+  private TabletMetadata tablet;
+  private final long creationTime;
+
+  public SplitTask(ServerContext context, TabletMetadata tablet, Manager manager) {
+    this.context = context;
+    this.tablet = tablet;
+    this.manager = manager;
+    this.creationTime = System.nanoTime();
+  }
+
+  @Override
+  public void run() {
+    try {
+      if (Duration.ofNanos(System.nanoTime() - creationTime).compareTo(Duration.ofMinutes(2)) > 0) {
+        // the tablet was in the thread pool queue for a bit, lets reread its metadata
+        tablet = manager.getContext().getAmple().readTablet(tablet.getExtent());
+        if (tablet == null) {
+          // the tablet no longer exists
+          return;
+        }
+      }
+
+      var extent = tablet.getExtent();
+
+      SortedSet<Text> splits = SplitUtils.findSplits(context, tablet);
+
+      if (tablet.getEndRow() != null) {
+        splits.remove(tablet.getEndRow());
+      }
+
+      if (splits.size() == 0) {
+        log.info("Tablet {} needs to split, but no split points could be found.",
+            tablet.getExtent());
+
+        manager.getSplitter().rememberUnsplittable(tablet);
+        manager.getSplitter().removeSplitStarting(tablet.getExtent());
+        return;
+      }
+
+      long fateTxId = manager.fate().startTransaction();
+
+      if (tablet.getLocation() != null) {
+        manager.requestUnassignment(tablet.getExtent(), fateTxId);
+      }
+
+      manager.fate().seedTransaction("SYSTEM_SPLIT", fateTxId, new PreSplit(extent, splits), true,
+          "System initiated split of tablet " + extent + " into " + splits.size() + " splits");
+    } catch (Exception e) {
+      log.error("Failed to split {}", tablet.getExtent(), e);
+    }
+  }
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java
new file mode 100644
index 0000000000..f4c871324f
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.split;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+public class SplitUtils {
+
+  private static final Logger log = LoggerFactory.getLogger(SplitUtils.class);
+
+  static class IndexIterator implements Iterator<Key> {
+
+    private final SortedKeyValueIterator<Key,Value> source;
+
+    private final Text prevEndRow;
+    private final Text endRow;
+
+    public IndexIterator(SortedKeyValueIterator<Key,Value> source, Text endRow, Text prevEndRow) {
+      this.source = source;
+      this.prevEndRow = prevEndRow;
+      this.endRow = endRow;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (prevEndRow != null) {
+        // this code filters out data because the rfile index iterators do not support seek, so just
+        // discard everything before our point of interest.
+        while (source.hasTop() && source.getTopKey().getRow().compareTo(prevEndRow) <= 0) {
+          try {
+            source.next();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        }
+      }
+
+      if (endRow != null) {
+        return source.hasTop() && source.getTopKey().getRow().compareTo(endRow) <= 0;
+      }
+
+      return source.hasTop();
+    }
+
+    @Override
+    public Key next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      Key key = source.getTopKey();
+
+      try {
+        source.next();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+
+      return key;
+    }
+  }
+
+  private static ArrayList<FileSKVIterator> openIndexes(ServerContext context,
+      TableConfiguration tableConf, Collection<StoredTabletFile> files) throws IOException {
+    long numKeys = 0;
+
+    ArrayList<FileSKVIterator> readers = new ArrayList<>();
+
+    try {
+      for (TabletFile file : files) {
+        FileSKVIterator reader = null;
+        Path path = file.getPath();
+        FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);
+
+        reader = FileOperations.getInstance().newIndexReaderBuilder()
+            .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService())
+            .withTableConfiguration(tableConf).build();
+
+        readers.add(reader);
+      }
+    } catch (IOException ioe) {
+      readers.forEach(reader -> {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          log.debug("failed to close reader", e);
+        }
+      });
+      throw ioe;
+    }
+
+    return readers;
+  }
+
+  public static class IndexIterable implements AutoCloseable, Iterable<Key> {
+
+    private final ServerContext context;
+    private final TableConfiguration tableConf;
+    private final Collection<StoredTabletFile> files;
+    private final Text prevEndRow;
+    private final Text endRow;
+    private ArrayList<FileSKVIterator> readers;
+
+    public IndexIterable(ServerContext context, TableConfiguration tableConf,
+        Collection<StoredTabletFile> files, Text endRow, Text prevEndRow) {
+      this.context = context;
+      this.tableConf = tableConf;
+      this.files = files;
+      this.prevEndRow = prevEndRow;
+      this.endRow = endRow;
+    }
+
+    @Override
+    public Iterator<Key> iterator() {
+      close();
+      try {
+        readers = openIndexes(context, tableConf, files);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+
+      List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<>(readers);
+      MultiIterator mmfi = new MultiIterator(iters, true);
+
+      return new IndexIterator(mmfi, endRow, prevEndRow);
+    }
+
+    @Override
+    public void close() {
+      if (readers != null) {
+        readers.forEach(reader -> {
+          try {
+            reader.close();
+          } catch (IOException e) {
+            log.debug("Failed to close index reader", e);
+          }
+        });
+        readers = null;
+      }
+    }
+  }
+
+  public static int calculateDesiredSplits(long esitimatedSize, long splitThreshold) {
+    // ELASTICITY_TODO tablets used to always split into 2 tablets. Now the split operation will
+    // split into many. How does this impact a tablet with many files and the estimated sizes after
+    // split vs the old method. Need to run test where we add lots of data to a single tablet,
+    // change the split thresh, wait for splits, then look at the estimated sizes, then compact and
+    // look at the sizes after. For example if a tablet has 10M of data and the split thesh is set
+    // to 100K, what will the est sizes look like across the tablets after splitting and then after
+    // compacting?
+    return (int) Math.floor((double) esitimatedSize / (double) splitThreshold);
+  }
+
+  public static SortedSet<Text> findSplits(ServerContext context, TabletMetadata tabletMetadata) {
+    var estimatedSize =
+        tabletMetadata.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
+    var tableConf = context.getTableConfiguration(tabletMetadata.getTableId());
+    var threshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
+    var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE);
+
+    // ELASTICITY_TODO rename and deprecate property. This is not executing in the tablet server
+    // anymore.
+    int maxFilesToOpen = tableConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
+
+    if (estimatedSize <= threshold) {
+      return new TreeSet<>();
+    }
+
+    if (tabletMetadata.getFiles().size() >= maxFilesToOpen) {
+      log.warn("Tablet {} has {} files which exceeds the max to open for split, so can not split.",
+          tabletMetadata.getExtent(), tabletMetadata.getFiles().size());
+      return new TreeSet<>();
+    }
+
+    try (var indexIterable = new IndexIterable(context, tableConf, tabletMetadata.getFiles(),
+        tabletMetadata.getEndRow(), tabletMetadata.getPrevEndRow())) {
+      var splits = findSplits(indexIterable, calculateDesiredSplits(estimatedSize, threshold));
+
+      splits.removeIf(split -> {
+        if (split.getLength() >= maxEndRowSize) {
+          log.warn("Ignoring split point for {} of length {}", tabletMetadata.getExtent(),
+              split.getLength());
+          return true;
+        }
+
+        return false;
+      });
+
+      return splits;
+    }
+  }
+
+  private static int longestCommonLength(ByteSequence bs1, ByteSequence bs2) {
+    int common = 0;
+    while (common < bs1.length() && common < bs2.length()
+        && bs1.byteAt(common) == bs2.byteAt(common)) {
+      common++;
+    }
+    return common;
+  }
+
+  public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits) {
+    Preconditions.checkArgument(desiredSplits >= 1);
+
+    int numKeys = Iterables.size(tabletIndexIterator);
+
+    double interSplitDistance = (double) numKeys / (double) (desiredSplits + 1);
+
+    SortedSet<Text> splits = new TreeSet<>();
+
+    long count = 0;
+
+    ByteSequence prevRow = null;
+    ByteSequence lastRow = null;
+
+    for (Key key : tabletIndexIterator) {
+      if (lastRow != null && !key.getRowData().equals(lastRow)) {
+        prevRow = lastRow;
+      }
+
+      count++;
+      if (count >= Math.round((splits.size() + 1) * interSplitDistance)) {
+        if (prevRow == null) {
+          splits.add(key.getRow());
+        } else {
+          var lcl = longestCommonLength(prevRow, key.getRowData());
+          if (lcl + 1 >= key.getRowData().length()) {
+            splits.add(key.getRow());
+          } else {
+            splits.add(new Text(key.getRowData().subSequence(0, lcl + 1).toArray()));
+          }
+
+        }
+
+        if (splits.size() >= desiredSplits) {
+          break;
+        }
+      }
+
+      lastRow = key.getRowData();
+    }
+
+    return splits;
+  }
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
new file mode 100644
index 0000000000..8f5629930a
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.split;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.accumulo.server.util.FileUtil.FileInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+
+public class Splitter {
+
+  private static final Logger log = LoggerFactory.getLogger(Splitter.class);
+
+  private final ServerContext context;
+  private final Ample.DataLevel level;
+
+  private final ExecutorService splitExecutor;
+
+  private final ScheduledExecutorService scanExecutor;
+  private final Manager manager;
+  private ScheduledFuture<?> scanFuture;
+
+  Cache<KeyExtent,KeyExtent> splitsStarting;
+
+  Cache<KeyExtent,HashCode> unsplittable;
+
+  private static class CacheKey {
+
+    final TableId tableId;
+    final TabletFile tabletFile;
+
+    public CacheKey(TableId tableId, TabletFile tabletFile) {
+      this.tableId = tableId;
+      this.tabletFile = tabletFile;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CacheKey cacheKey = (CacheKey) o;
+      return Objects.equals(tableId, cacheKey.tableId)
+          && Objects.equals(tabletFile, cacheKey.tabletFile);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tableId, tabletFile);
+    }
+
+  }
+
+  LoadingCache<CacheKey,FileInfo> splitFileCache;
+
+  private static int weigh(KeyExtent keyExtent) {
+    int size = 0;
+    size += keyExtent.tableId().toString().length();
+    if (keyExtent.endRow() != null) {
+      size += keyExtent.endRow().getLength();
+    }
+    if (keyExtent.prevEndRow() != null) {
+      size += keyExtent.prevEndRow().getLength();
+    }
+    return size;
+  }
+
+  public Splitter(ServerContext context, Ample.DataLevel level, Manager manager) {
+    this.context = context;
+    this.level = level;
+    this.manager = manager;
+    this.splitExecutor = context.threadPools().createExecutorService(context.getConfiguration(),
+        Property.MANAGER_SPLIT_WORKER_THREADS, true);
+    this.scanExecutor =
+        context.threadPools().createScheduledExecutorService(1, "Tablet Split Scanner", true);
+
+    Weigher<CacheKey,FileInfo> weigher =
+        (key, info) -> key.tableId.canonical().length() + key.tabletFile.getPathStr().length()
+            + info.getFirstRow().getLength() + info.getLastRow().getLength();
+
+    CacheLoader<CacheKey,FileInfo> loader = new CacheLoader<>() {
+      @Override
+      public FileInfo load(CacheKey key) throws Exception {
+        TableConfiguration tableConf = context.getTableConfiguration(key.tableId);
+        return FileUtil.tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile))
+            .get(key.tabletFile);
+      }
+    };
+
+    splitFileCache = Caffeine.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
+        .maximumWeight(10_000_000L).weigher(weigher).build(loader);
+
+    Weigher<KeyExtent,KeyExtent> weigher2 = (keyExtent, keyExtent2) -> weigh(keyExtent);
+
+    // Tracks splits starting, but not forever in case something in the code does not remove it.
+    splitsStarting = Caffeine.newBuilder().expireAfterAccess(3, TimeUnit.HOURS)
+        .maximumWeight(10_000_000L).weigher(weigher2).build();
+
+    Weigher<KeyExtent,HashCode> weigher3 = (keyExtent, hc) -> {
+      return weigh(keyExtent) + hc.bits() / 8;
+    };
+
+    unsplittable = Caffeine.newBuilder().expireAfterAccess(24, TimeUnit.HOURS)
+        .maximumWeight(10_000_000L).weigher(weigher3).build();
+  }
+
+  public synchronized void start() {
+    Preconditions.checkState(scanFuture == null);
+    Preconditions.checkState(!scanExecutor.isShutdown());
+    // ELASTICITY_TODO make this configurable if functionality is not moved elsewhere
+    scanFuture = scanExecutor.scheduleWithFixedDelay(new SplitScanner(context, level, manager), 1,
+        10, TimeUnit.SECONDS);
+  }
+
+  public synchronized void stop() {
+    scanFuture.cancel(true);
+    scanExecutor.shutdownNow();
+    splitExecutor.shutdownNow();
+  }
+
+  public FileInfo getCachedFileInfo(TableId tableId, TabletFile tabletFile) {
+    return splitFileCache.get(new CacheKey(tableId, tabletFile));
+  }
+
+  private HashCode caclulateFilesHash(TabletMetadata tabletMetadata) {
+    var hasher = Hashing.goodFastHash(128).newHasher();
+    tabletMetadata.getFiles().stream().map(StoredTabletFile::getPathStr).sorted()
+        .forEach(path -> hasher.putString(path, UTF_8));
+    return hasher.hash();
+  }
+
+  /**
+   * This tablet met the criteria for split but inspection could not find a split point. Remember
+   * this to avoid wasting time on future inspections until its files change.
+   */
+  public void rememberUnsplittable(TabletMetadata tablet) {
+    unsplittable.put(tablet.getExtent(), caclulateFilesHash(tablet));
+  }
+
+  /**
+   * Determines if further inspection should be done on a tablet that meets the criteria for splits.
+   */
+  public boolean shouldInspect(TabletMetadata tablet) {
+    if (splitsStarting.getIfPresent(tablet.getExtent()) != null) {
+      return false;
+    }
+
+    var hashCode = unsplittable.getIfPresent(tablet.getExtent());
+
+    if (hashCode != null) {
+      if (hashCode.equals(caclulateFilesHash(tablet))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Temporarily remember that the process of splitting is starting for this tablet making
+   * {@link #shouldInspect(TabletMetadata)} return false in the future.
+   */
+  public boolean addSplitStarting(KeyExtent extent) {
+    Objects.requireNonNull(extent);
+    return splitsStarting.asMap().put(extent, extent) == null;
+  }
+
+  public void removeSplitStarting(KeyExtent extent) {
+    splitsStarting.invalidate(extent);
+  }
+
+  public void executeSplit(SplitTask splitTask) {
+    splitExecutor.execute(splitTask);
+  }
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
index 05df4245ee..19a90c67cd 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
@@ -93,7 +93,7 @@ class PopulateMetadata extends ManagerRepo {
         tabletMutator.putDirName(dirName);
         tabletMutator.putTime(new MetadataTime(0, tableInfo.getTimeType()));
         tabletMutator.putZooLock(lock);
-        tabletMutator.setHostingGoal(tableInfo.getInitialHostingGoal());
+        tabletMutator.putHostingGoal(tableInfo.getInitialHostingGoal());
         tabletMutator.mutate();
 
         prevSplit = split;
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/goal/SetHostingGoal.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/goal/SetHostingGoal.java
index 25351060b7..a508324f76 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/goal/SetHostingGoal.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/goal/SetHostingGoal.java
@@ -109,7 +109,7 @@ public class SetHostingGoal extends ManagerRepo {
         }
 
         LOG.debug("Setting tablet hosting goal to {} requested for: {} ", goal, tabletExtent);
-        mutator.mutateTablet(tabletExtent).setHostingGoal(goal).mutate();
+        mutator.mutateTablet(tabletExtent).putHostingGoal(goal).mutate();
       }
     }
     Utils.unreserveNamespace(manager, namespaceId, tid, false);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
new file mode 100644
index 0000000000..181bc71e62
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.split;
+
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+
+public class DeleteOperationIds extends ManagerRepo {
+  private static final long serialVersionUID = 1L;
+  private final SplitInfo splitInfo;
+
+  public DeleteOperationIds(SplitInfo splitInfo) {
+    this.splitInfo = splitInfo;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+
+    var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+
+    try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
+
+      // As long as the operation is not our operation id, then this step can be considered
+      // successful in the case of unknown. If this repo is running for a second time and has
+      // already deleted the operation id, then it could be absent or set by another fate operation.
+      Ample.UnknownValidator unknownValidator =
+          tabletMetadata -> tabletMetadata.getOperationId() == null
+              || !tabletMetadata.getOperationId().equals(opid);
+
+      splitInfo.getTablets().forEach(extent -> {
+        tabletsMutator.mutateTablet(extent).requireOperation(opid).deleteOperation()
+            .submit(unknownValidator);
+      });
+
+      var results = tabletsMutator.process();
+
+      boolean allAccepted = results.values().stream()
+          .allMatch(result -> result.getStatus() == ConditionalWriter.Status.ACCEPTED);
+
+      if (!allAccepted) {
+        // ELASTICITY_TODO not handling the case where running a 2nd time in the case of failures.
+        // Fix this when improving unknown handling.
+        throw new IllegalStateException(
+            "Failed to delete operation ids " + splitInfo.getOriginal() + " " + results.values()
+                .stream().map(Ample.ConditionalResult::getStatus).collect(Collectors.toSet()));
+      }
+    }
+
+    return null;
+  }
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
new file mode 100644
index 0000000000..9d8b853fba
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.split;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.SortedSet;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.tablets.UniqueNameAllocator;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class PreSplit extends ManagerRepo {
+  private static final long serialVersionUID = 1L;
+  private static final Logger log = LoggerFactory.getLogger(PreSplit.class);
+
+  private final SplitInfo splitInfo;
+
+  public PreSplit(KeyExtent expectedExtent, SortedSet<Text> splits) {
+    Objects.requireNonNull(expectedExtent);
+    Objects.requireNonNull(splits);
+    Preconditions.checkArgument(!splits.isEmpty());
+    Preconditions.checkArgument(!expectedExtent.isRootTablet());
+    this.splitInfo = new SplitInfo(expectedExtent, splits);
+  }
+
+  @Override
+  public long isReady(long tid, Manager manager) throws Exception {
+
+    // ELASTICITY_TODO intentionally not getting the table lock because not sure if its needed,
+    // revist later when more operations are moved out of tablet server
+
+    var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+
+    // ELASTICITY_TODO write IT that spins up 100 threads that all try to add a diff split to
+    // the same tablet.
+
+    // ELASTICITY_TODO does FATE prioritize running Fate txs that have already started? If not would
+    // be good to look into this so we can finish things that are started before running new txs
+    // that have not completed their first step. Once splits starts running, would like it to move
+    // through as quickly as possible.
+
+    try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
+
+      tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation()
+          .requireAbsentLocation().requirePrevEndRow(splitInfo.getOriginal().prevEndRow())
+          .putOperation(opid)
+          .submit(tmeta -> tmeta.getOperationId() != null && tmeta.getOperationId().equals(opid));
+
+      Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process();
+
+      if (results.get(splitInfo.getOriginal()).getStatus() == ConditionalWriter.Status.ACCEPTED) {
+        log.trace("{} reserved {} for split", FateTxId.formatTid(tid), splitInfo.getOriginal());
+        return 0;
+      } else {
+        var tabletMetadata = results.get(splitInfo.getOriginal()).readMetadata();
+
+        // its possible the tablet no longer exists
+        var optMeta = Optional.ofNullable(tabletMetadata);
+
+        log.trace("{} Failed to set operation id. extent:{} location:{} opid:{}",
+            FateTxId.formatTid(tid), splitInfo.getOriginal(),
+            optMeta.map(TabletMetadata::getLocation).orElse(null),
+            optMeta.map(TabletMetadata::getOperationId).orElse(null));
+
+        if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
+          // the tablet exists but has a location, lets try again later
+          manager.requestUnassignment(tabletMetadata.getExtent(), tid);
+          return 2000;
+        } else {
+          // The tablet may no longer exists, another operation may have it reserved, or maybe we
+          // already reserved and a fault happened. In any case lets proceed, the tablet will be
+          // checked in the call() function and it will sort everything out.
+          return 0;
+        }
+      }
+    }
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+    // ELASTICITY_TODO need to make manager ignore tablet with an operation id for assignment
+    // purposes
+    manager.cancelUnassignmentRequest(splitInfo.getOriginal(), tid);
+    manager.getSplitter().removeSplitStarting(splitInfo.getOriginal());
+
+    TabletMetadata tabletMetadata = manager.getContext().getAmple().readTablet(
+        splitInfo.getOriginal(), ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.OPID);
+
+    var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+
+    if (tabletMetadata == null || !tabletMetadata.getOperationId().equals(opid)) {
+      // the tablet no longer exists or we could not set the operation id, maybe another operation
+      // was running, lets not proceed with the split.
+      var optMeta = Optional.ofNullable(tabletMetadata);
+      log.trace("{} Not proceeding with split. extent:{} location:{} opid:{}",
+          FateTxId.formatTid(tid), splitInfo.getOriginal(),
+          optMeta.map(TabletMetadata::getLocation).orElse(null),
+          optMeta.map(TabletMetadata::getOperationId).orElse(null));
+      return null;
+    }
+
+    // Create the dir name here for the next step. If the next step fails it will always have the
+    // same dir name each time it runs again making it idempotent.
+
+    List<String> dirs = new ArrayList<>();
+
+    splitInfo.getSplits().forEach(split -> {
+      String dirName = UniqueNameAllocator.createTabletDirectoryName(manager.getContext(), split);
+      dirs.add(dirName);
+      log.trace("{} allocated dir name {}", FateTxId.formatTid(tid), dirName);
+    });
+
+    return new UpdateTablets(splitInfo, dirs);
+  }
+
+  @Override
+  public void undo(long tid, Manager manager) throws Exception {
+    // TODO is this called if isReady fails?
+    // TODO should operation id be cleaned up? maybe not, tablet may be in an odd state
+    manager.cancelUnassignmentRequest(splitInfo.getOriginal(), tid);
+    manager.getSplitter().removeSplitStarting(splitInfo.getOriginal());
+  }
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
new file mode 100644
index 0000000000..52051a0dc0
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.split;
+
+import java.io.Serializable;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+public class SplitInfo implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final TableId tableId;
+  private final byte[] prevEndRow;
+  private final byte[] endRow;
+  private final byte[][] splits;
+
+  SplitInfo(KeyExtent extent, SortedSet<Text> splits) {
+    this.tableId = extent.tableId();
+    this.prevEndRow = extent.prevEndRow() == null ? null : TextUtil.getBytes(extent.prevEndRow());
+    this.endRow = extent.endRow() == null ? null : TextUtil.getBytes(extent.endRow());
+    this.splits = new byte[splits.size()][];
+
+    int index = 0;
+    for (var split : splits) {
+      Preconditions.checkArgument(extent.contains(split));
+      this.splits[index] = TextUtil.getBytes(split);
+      index++;
+    }
+  }
+
+  private static Text toText(byte[] bytes) {
+    return bytes == null ? null : new Text(bytes);
+  }
+
+  KeyExtent getOriginal() {
+    return new KeyExtent(tableId, toText(endRow), toText(prevEndRow));
+  }
+
+  SortedSet<Text> getSplits() {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (int i = 0; i < this.splits.length; i++) {
+      splits.add(new Text(this.splits[i]));
+    }
+    return splits;
+  }
+
+  SortedSet<KeyExtent> getTablets() {
+
+    Text prev = getOriginal().prevEndRow();
+
+    TreeSet<KeyExtent> tablets = new TreeSet<>();
+
+    double sum = 0;
+
+    for (var split : getSplits()) {
+      var extent = new KeyExtent(getOriginal().tableId(), split, prev);
+      prev = split;
+      tablets.add(extent);
+    }
+
+    var extent = new KeyExtent(getOriginal().tableId(), getOriginal().endRow(), prev);
+    tablets.add(extent);
+
+    return tablets;
+  }
+
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
new file mode 100644
index 0000000000..09db1e11f8
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.split;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+public class UpdateTablets extends ManagerRepo {
+  private static final Logger log = LoggerFactory.getLogger(UpdateTablets.class);
+  private static final long serialVersionUID = 1L;
+  private final SplitInfo splitInfo;
+  private final List<String> dirNames;
+
+  public UpdateTablets(SplitInfo splitInfo, List<String> dirNames) {
+    this.splitInfo = splitInfo;
+    this.dirNames = dirNames;
+  }
+
+  @Override
+  public Repo<Manager> call(long tid, Manager manager) throws Exception {
+    TabletMetadata tabletMetadata =
+        manager.getContext().getAmple().readTablet(splitInfo.getOriginal());
+
+    var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
+
+    if (tabletMetadata == null) {
+      // check to see if this operation has already succeeded.
+      TabletMetadata newTabletMetadata =
+          manager.getContext().getAmple().readTablet(splitInfo.getTablets().last());
+
+      if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) {
+        // have already created the new tablet and failed before we could return the next step, so
+        // lets go ahead and return the next step.
+        log.trace(
+            "{} creating new tablet was rejected because it existed, operation probably failed before.",
+            FateTxId.formatTid(tid));
+        return new DeleteOperationIds(splitInfo);
+      } else {
+        throw new IllegalStateException("Tablet is in an unexpected condition "
+            + splitInfo.getOriginal() + " " + (newTabletMetadata == null) + " "
+            + (newTabletMetadata == null ? null : newTabletMetadata.getOperationId()));
+      }
+    }
+
+    Preconditions.checkState(tabletMetadata.getOperationId().equals(opid),
+        "Tablet %s does not have expected operation id %s it has %s", splitInfo.getOriginal(), opid,
+        tabletMetadata.getOperationId());
+
+    var newTablets = splitInfo.getTablets();
+
+    var newTabletsFiles = getNewTabletFiles(newTablets, tabletMetadata,
+        file -> manager.getSplitter().getCachedFileInfo(splitInfo.getOriginal().tableId(), file));
+
+    addNewTablets(tid, manager, tabletMetadata, opid, newTablets, newTabletsFiles);
+
+    // Only update the original tablet after successfully creating the new tablets, this is
+    // important for failure cases where this operation partially runs a then runs again.
+
+    updateExistingTablet(manager, tabletMetadata, opid, newTablets, newTabletsFiles);
+
+    return new DeleteOperationIds(splitInfo);
+  }
+
+  /**
+   * Determine which files from the original tablet go to each new tablet being created by the
+   * split.
+   */
+  static Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> getNewTabletFiles(
+      Set<KeyExtent> newTablets, TabletMetadata tabletMetadata,
+      Function<StoredTabletFile,FileUtil.FileInfo> fileInfoProvider) {
+
+    Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> tabletsFiles = new TreeMap<>();
+
+    newTablets.forEach(extent -> tabletsFiles.put(extent, new HashMap<>()));
+
+    // determine while files overlap which tablets and their estimated sizes
+    tabletMetadata.getFilesMap().forEach((file, dataFileValue) -> {
+      FileUtil.FileInfo fileInfo = fileInfoProvider.apply(file);
+
+      Range fileRange;
+      if (fileInfo != null) {
+        fileRange = new Range(fileInfo.getFirstRow(), fileInfo.getLastRow());
+      } else {
+        fileRange = new Range();
+      }
+
+      // count how many of the new tablets the file will overlap
+      double numOverlapping = newTablets.stream().map(KeyExtent::toDataRange)
+          .filter(range -> range.clip(fileRange, true) != null).count();
+
+      Preconditions.checkState(numOverlapping > 0);
+
+      // evenly split the tablets estimates between the number of tablets it actually overlaps
+      double sizePerTablet = dataFileValue.getSize() / numOverlapping;
+      double entriesPerTablet = dataFileValue.getNumEntries() / numOverlapping;
+
+      // add the file to the tablets it overlaps
+      newTablets.forEach(newTablet -> {
+        if (newTablet.toDataRange().clip(fileRange, true) != null) {
+          DataFileValue ndfv = new DataFileValue((long) sizePerTablet, (long) entriesPerTablet,
+              dataFileValue.getTime());
+          tabletsFiles.get(newTablet).put(file, ndfv);
+        }
+      });
+    });
+
+    if (log.isTraceEnabled()) {
+      tabletMetadata.getFilesMap().forEach((f, v) -> {
+        log.trace("{} original file {} {} {}", tabletMetadata.getExtent(), f.getFileName(),
+            v.getSize(), v.getNumEntries());
+      });
+
+      tabletsFiles.forEach((extent, files) -> {
+        files.forEach((f, v) -> {
+          log.trace("{} split file {} {} {}", extent, f.getFileName(), v.getSize(),
+              v.getNumEntries());
+        });
+      });
+    }
+
+    return tabletsFiles;
+  }
+
+  private void addNewTablets(long tid, Manager manager, TabletMetadata tabletMetadata,
+      TabletOperationId opid, SortedSet<KeyExtent> newTablets,
+      Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) {
+    Iterator<String> dirNameIter = dirNames.iterator();
+
+    try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
+      for (var newExtent : newTablets) {
+        if (newExtent.equals(newTablets.last())) {
+          // Skip the last tablet, its done in the next fate step.
+          continue;
+        }
+
+        var mutator = tabletsMutator.mutateTablet(newExtent).requireAbsentTablet();
+
+        mutator.putOperation(opid);
+        mutator.putDirName(dirNameIter.next());
+        mutator.putTime(tabletMetadata.getTime());
+        tabletMetadata.getFlushId().ifPresent(mutator::putFlushId);
+        mutator.putPrevEndRow(newExtent.prevEndRow());
+        tabletMetadata.getCompactId().ifPresent(mutator::putCompactionId);
+        mutator.putHostingGoal(tabletMetadata.getHostingGoal());
+
+        tabletMetadata.getLoaded().forEach(mutator::putBulkFile);
+        tabletMetadata.getLogs().forEach(mutator::putWal);
+
+        newTabletsFiles.get(newExtent).forEach(mutator::putFile);
+
+        mutator.submit(afterMeta -> afterMeta.getOperationId() != null
+            && afterMeta.getOperationId().equals(opid));
+      }
+
+      var results = tabletsMutator.process();
+      results.values().forEach(result -> {
+        var status = result.getStatus();
+        if (status == ConditionalWriter.Status.REJECTED) {
+          // lets see if this was rejected because this operation is running again in the case of
+          // failure
+          var newTabletMetadata = result.readMetadata();
+          if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) {
+            log.trace(
+                "{} {} creating new tablet was rejected because it existed, operation probably failed before.",
+                FateTxId.formatTid(tid), result.getExtent());
+            return;
+          }
+        }
+
+        Preconditions.checkState(status == ConditionalWriter.Status.ACCEPTED,
+            "Failed to add new tablet %s %s %s", status, splitInfo.getOriginal(),
+            result.getExtent());
+      });
+    }
+  }
+
+  private void updateExistingTablet(Manager manager, TabletMetadata tabletMetadata,
+      TabletOperationId opid, SortedSet<KeyExtent> newTablets,
+      Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) {
+    try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
+      var newExtent = newTablets.last();
+
+      var mutator = tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid)
+          .requirePrevEndRow(splitInfo.getOriginal().prevEndRow());
+
+      mutator.putPrevEndRow(newExtent.prevEndRow());
+
+      newTabletsFiles.get(newExtent).forEach(mutator::putFile);
+
+      // remove the files from the original tablet that did not end up in the tablet
+      tabletMetadata.getFiles().forEach(existingFile -> {
+        if (!newTabletsFiles.get(newExtent).containsKey(existingFile)) {
+          mutator.deleteFile(existingFile);
+        }
+      });
+
+      mutator.submit();
+
+      var result = tabletsMutator.process().get(splitInfo.getOriginal());
+
+      if (result.getStatus() == ConditionalWriter.Status.UNKNOWN) {
+        // Can not use Ample's built in code for checking unknown because we are changing the prev
+        // end row, so much check it manually
+
+        var tabletMeta = manager.getContext().getAmple().readTablet(newExtent);
+
+        if (tabletMeta == null || !tabletMeta.getOperationId().equals(opid)) {
+          // ELASTICITY_TODO need to retry when an UNKNOWN condition is seen, its possible the
+          // mutation never made it to the tserver. May want ample to always retry on unknown and
+          // change the unknown handler to a rejected handler.
+          throw new IllegalStateException("Failed to update existing tablet in split "
+              + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent());
+        }
+      } else if (result.getStatus() != ConditionalWriter.Status.ACCEPTED) {
+        // maybe this step is being run again and the update was already made
+        throw new IllegalStateException("Failed to update existing tablet in split "
+            + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent());
+      }
+    }
+  }
+}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
index 6aa82e278c..1d5a56804d 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
@@ -57,7 +57,7 @@ public class Upgrader11to12 implements Upgrader {
         TabletsMetadata tm =
             context.getAmple().readTablets().forTable(tableId).fetch(ColumnType.PREV_ROW).build();
         TabletsMutator mut = context.getAmple().mutateTablets()) {
-      tm.forEach(t -> mut.mutateTablet(t.getExtent()).setHostingGoal(TabletHostingGoal.ALWAYS));
+      tm.forEach(t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ALWAYS));
     }
   }
 
@@ -74,7 +74,7 @@ public class Upgrader11to12 implements Upgrader {
         TabletsMetadata tm = context.getAmple().readTablets().forLevel(DataLevel.USER)
             .fetch(ColumnType.PREV_ROW).build();
         TabletsMutator mut = context.getAmple().mutateTablets()) {
-      tm.forEach(t -> mut.mutateTablet(t.getExtent()).setHostingGoal(TabletHostingGoal.ONDEMAND));
+      tm.forEach(t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ONDEMAND));
     }
   }
 
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java
new file mode 100644
index 0000000000..66684a5333
--- /dev/null
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.split;
+
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Iterators;
+
+public class SplitUtilsTest {
+
+  private static Key newKey(int i) {
+    return new Key(String.format("%04d", i));
+  }
+
+  private static SortedSet<Text> newRowsSet(int... rows) {
+    return newRowsSet(IntStream.of(rows));
+  }
+
+  private static SortedSet<Text> newRowsSet(Function<Integer,Integer> prevFunc, int... rows) {
+    return newRowsSet(IntStream.of(rows), prevFunc);
+  }
+
+  private static SortedSet<Text> newRowsSet(IntStream intStream) {
+    return newRowsSet(intStream, i -> i - 1);
+  }
+
+  private static SortedSet<Text> newRowsSet(IntStream intStream,
+      Function<Integer,Integer> prevFunc) {
+    var iter = intStream.iterator();
+
+    TreeSet<Text> rows = new TreeSet<>();
+
+    while (iter.hasNext()) {
+      var i = iter.next();
+      var s = String.format("%04d", i);
+
+      if (prevFunc.apply(i) != null) {
+        String prev = String.format("%04d", prevFunc.apply(i));
+
+        int common = 0;
+        for (; common < s.length(); common++) {
+          if (prev.charAt(common) != s.charAt(common)) {
+            break;
+          }
+        }
+
+        if (common + 1 < s.length()) {
+          s = s.substring(0, common + 1);
+        }
+      }
+
+      rows.add(new Text(s));
+    }
+
+    return rows;
+  }
+
+  private static SortedSet<Text> newRowSet(String... rows) {
+    return Stream.of(rows).map(Text::new).collect(toCollection(TreeSet::new));
+  }
+
+  private static Iterable<Key> newIndexIterable(IntStream intStream, Integer endRow,
+      Integer prevEndRow) {
+    return newIndexIterable(intStream.mapToObj(i -> String.format("%04d", i)),
+        endRow == null ? null : String.format("%04d", endRow),
+        prevEndRow == null ? null : String.format("%04d", prevEndRow));
+  }
+
+  private static Iterable<Key> newIndexIterable(Stream<String> stream, String endRow,
+      String prevEndRow) {
+    TreeMap<Key,Value> data = new TreeMap<>();
+
+    stream.forEach(row -> data.put(new Key(row), new Value("")));
+    Text er = endRow == null ? null : new Text(endRow);
+    Text pr = prevEndRow == null ? null : new Text(prevEndRow);
+
+    return () -> {
+      var accumuloIter = new SortedMapIterator(data);
+      try {
+        accumuloIter.seek(new Range(), Set.of(), false);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+      return new SplitUtils.IndexIterator(accumuloIter, er, pr);
+    };
+  }
+
+  @Test
+  public void testFindSplits() {
+    List<Key> keys = IntStream.range(1, 101).mapToObj(SplitUtilsTest::newKey).collect(toList());
+    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
+    assertEquals(newRowsSet(33, 67), SplitUtils.findSplits(keys, 2));
+    assertEquals(newRowsSet(25, 50, 75), SplitUtils.findSplits(keys, 3));
+    assertEquals(newRowsSet(20, 40, 60, 80), SplitUtils.findSplits(keys, 4));
+    assertEquals(newRowsSet(17, 33, 50, 67, 83), SplitUtils.findSplits(keys, 5));
+    assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), SplitUtils.findSplits(keys, 6));
+    assertEquals(newRowsSet(13, 25, 38, 50, 63, 75, 88), SplitUtils.findSplits(keys, 7));
+    assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10)),
+        SplitUtils.findSplits(keys, 9));
+    assertEquals(newRowsSet(IntStream.range(1, 20).map(i -> i * 5)),
+        SplitUtils.findSplits(keys, 19));
+    assertEquals(newRowsSet(IntStream.range(1, 50).map(i -> i * 2)),
+        SplitUtils.findSplits(keys, 49));
+    assertEquals(newRowsSet(IntStream.range(1, 100)), SplitUtils.findSplits(keys, 99));
+    assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 100));
+    assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 1000));
+  }
+
+  @Test
+  public void testFindSplitsFewRows() {
+    List<Key> keys = IntStream.range(1, 101).map(i -> i / 10 * 10).mapToObj(SplitUtilsTest::newKey)
+        .collect(toList());
+
+    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
+    assertEquals(newRowsSet(i -> i - 10, 30, 60), SplitUtils.findSplits(keys, 2));
+    assertEquals(newRowsSet(i -> i - 10, 20, 50, 70), SplitUtils.findSplits(keys, 3));
+    assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10), i -> i - 10),
+        SplitUtils.findSplits(keys, 9));
+    assertEquals(newRowsSet(IntStream.range(0, 11).map(i -> i * 10), i -> i == 0 ? null : i - 10),
+        SplitUtils.findSplits(keys, 19));
+    assertEquals(newRowsSet(IntStream.range(0, 11).map(i -> i * 10), i -> i == 0 ? null : i - 10),
+        SplitUtils.findSplits(keys, 100));
+  }
+
+  @Test
+  public void testIndexIterator() {
+    Iterable<Key> keys = newIndexIterable(IntStream.range(1, 101), null, null);
+    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
+    assertEquals(newRowsSet(25, 50, 75), SplitUtils.findSplits(keys, 3));
+    assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), SplitUtils.findSplits(keys, 6));
+    assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 200));
+
+    keys = newIndexIterable(IntStream.range(1, 101), null, 50);
+    assertEquals(newRowsSet(75), SplitUtils.findSplits(keys, 1));
+    assertEquals(newRowsSet(67, 83), SplitUtils.findSplits(keys, 2));
+    assertEquals(newRowsSet(60, 70, 80, 90), SplitUtils.findSplits(keys, 4));
+    assertEquals(newRowsSet(IntStream.range(51, 101)), SplitUtils.findSplits(keys, 60));
+
+    keys = newIndexIterable(IntStream.range(1, 101), 50, null);
+    assertEquals(newRowsSet(25), SplitUtils.findSplits(keys, 1));
+    assertEquals(newRowsSet(17, 33), SplitUtils.findSplits(keys, 2));
+    assertEquals(newRowsSet(10, 20, 30, 40), SplitUtils.findSplits(keys, 4));
+    assertEquals(newRowsSet(IntStream.range(1, 51)), SplitUtils.findSplits(keys, 60));
+
+    keys = newIndexIterable(IntStream.range(1, 101), 75, 25);
+    assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1));
+    assertEquals(newRowsSet(17 + 25, 33 + 25), SplitUtils.findSplits(keys, 2));
+    assertEquals(newRowsSet(35, 45, 55, 65), SplitUtils.findSplits(keys, 4));
+    assertEquals(newRowsSet(IntStream.range(26, 76)), SplitUtils.findSplits(keys, 60));
+  }
+
+  @Test
+  public void testIndexIteratorNoDataInRange() {
+    // Simulate the situation where the index has no data in a tablets range because the tablets row
+    // range falls between two index keys.
+    Iterable<Key> keys = newIndexIterable(IntStream.range(1, 101).map(i -> i * 1000), 250, 150);
+    assertFalse(keys.iterator().hasNext());
+    assertEquals(Set.of(), SplitUtils.findSplits(keys, 1));
+    assertEquals(Set.of(), SplitUtils.findSplits(keys, 2));
+  }
+
+  @Test
+  public void testNoSuchElement() {
+    var keys = newIndexIterable(IntStream.range(1, 101), 75, 25);
+    var iterator = keys.iterator();
+    // call next() w/o calling hasNext(), should work because there is data.
+    iterator.next();
+    assertEquals(50 - 1, Iterators.size(iterator));
+    assertThrows(NoSuchElementException.class, iterator::next);
+
+    // try an iterator w/ null rows
+    keys = newIndexIterable(IntStream.range(1, 101), null, null);
+    iterator = keys.iterator();
+    iterator.next();
+    assertEquals(100 - 1, Iterators.size(iterator));
+    assertThrows(NoSuchElementException.class, iterator::next);
+  }
+
+  @Test
+  public void testVariableLength() {
+    var keys = newIndexIterable(
+        Stream.of("aa11", "aa112", "b", "bg45", "ct", "cz7882", "mn", "mn009", "mnrtssd", "mnz076"),
+        null, null);
+    assertEquals(newRowSet("c"), SplitUtils.findSplits(keys, 1));
+    assertEquals(newRowSet("b", "m"), SplitUtils.findSplits(keys, 2));
+    assertEquals(newRowSet("b", "c", "mn0"), SplitUtils.findSplits(keys, 3));
+    assertEquals(newRowSet("aa112", "bg", "cz", "mn0"), SplitUtils.findSplits(keys, 4));
+    assertEquals(newRowSet("aa11", "aa112", "b", "bg", "c", "cz", "m", "mn0", "mnr", "mnz"),
+        SplitUtils.findSplits(keys, 10));
+  }
+}
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java
new file mode 100644
index 0000000000..ad49006d5d
--- /dev/null
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.split;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+public class SplitterTest {
+
+  @Test
+  public void testShouldInspect() {
+    ThreadPools threadPools = createNiceMock(ThreadPools.class);
+    replay(threadPools);
+    ServerContext context = createNiceMock(ServerContext.class);
+    expect(context.threadPools()).andReturn(threadPools).anyTimes();
+    replay(context);
+
+    var splitter = new Splitter(context, null, null);
+
+    KeyExtent ke1 = new KeyExtent(TableId.of("1"), new Text("m"), null);
+    KeyExtent ke2 = new KeyExtent(TableId.of("1"), null, new Text("m"));
+
+    Set<StoredTabletFile> files1 = Set.of(
+        new StoredTabletFile("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"),
+        new StoredTabletFile(
+            "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
+
+    TabletMetadata tabletMeta1 = createMock(TabletMetadata.class);
+    expect(tabletMeta1.getExtent()).andReturn(ke1).anyTimes();
+    expect(tabletMeta1.getFiles()).andReturn(files1).anyTimes();
+    replay(tabletMeta1);
+
+    TabletMetadata tabletMeta2 = createMock(TabletMetadata.class);
+    expect(tabletMeta2.getExtent()).andReturn(ke2).anyTimes();
+    replay(tabletMeta2);
+
+    assertTrue(splitter.shouldInspect(tabletMeta1));
+    assertTrue(splitter.shouldInspect(tabletMeta2));
+
+    splitter.addSplitStarting(ke1);
+
+    assertFalse(splitter.shouldInspect(tabletMeta1));
+    assertTrue(splitter.shouldInspect(tabletMeta2));
+
+    splitter.removeSplitStarting(ke1);
+
+    assertTrue(splitter.shouldInspect(tabletMeta1));
+    assertTrue(splitter.shouldInspect(tabletMeta2));
+
+    splitter.rememberUnsplittable(tabletMeta1);
+
+    assertFalse(splitter.shouldInspect(tabletMeta1));
+    assertTrue(splitter.shouldInspect(tabletMeta2));
+
+    // when a tablets files change it should become a candidate for inspection
+    Set<StoredTabletFile> files2 = Set.of(
+        new StoredTabletFile("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"),
+        new StoredTabletFile("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"),
+        new StoredTabletFile(
+            "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000073.rf"));
+    TabletMetadata tabletMeta3 = createMock(TabletMetadata.class);
+    expect(tabletMeta3.getExtent()).andReturn(ke1).anyTimes();
+    expect(tabletMeta3.getFiles()).andReturn(files2).anyTimes();
+    replay(tabletMeta3);
+
+    assertTrue(splitter.shouldInspect(tabletMeta3));
+    assertTrue(splitter.shouldInspect(tabletMeta2));
+  }
+
+}
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
new file mode 100644
index 0000000000..08f2151d5f
--- /dev/null
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.split;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Test;
+
+public class UpdateTabletsTest {
+
+  StoredTabletFile newSTF(int fileNum) {
+    return new StoredTabletFile(
+        "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F00000" + fileNum + ".rf");
+  }
+
+  FileUtil.FileInfo newFileInfo(String start, String end) {
+    return new FileUtil.FileInfo(new Key(start), new Key(end));
+  }
+
+  // When a tablet splits its files are partitioned among the new children tablets. This test
+  // exercises the partitioning code.
+  @Test
+  public void testFileParitioning() {
+
+    var file1 = newSTF(1);
+    var file2 = newSTF(2);
+    var file3 = newSTF(3);
+    var file4 = newSTF(4);
+
+    var tabletFiles =
+        Map.of(file1, new DataFileValue(1000, 100, 20), file2, new DataFileValue(2000, 200, 50),
+            file3, new DataFileValue(4000, 400), file4, new DataFileValue(4000, 400));
+
+    var ke1 = new KeyExtent(TableId.of("1"), new Text("m"), null);
+    var ke2 = new KeyExtent(TableId.of("1"), new Text("r"), new Text("m"));
+    var ke3 = new KeyExtent(TableId.of("1"), new Text("v"), new Text("r"));
+    var ke4 = new KeyExtent(TableId.of("1"), null, new Text("v"));
+
+    var firstAndLastKeys = Map.of(file2, newFileInfo("m", "r"), file3, newFileInfo("g", "x"), file4,
+        newFileInfo("s", "v"));
+
+    var ke1Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2,
+        new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100));
+    var ke2Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2,
+        new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100));
+    var ke3Expected = Map.of(file1, new DataFileValue(250, 25, 20), file3,
+        new DataFileValue(1000, 100), file4, new DataFileValue(4000, 400));
+    var ke4Expected =
+        Map.of(file1, new DataFileValue(250, 25, 20), file3, new DataFileValue(1000, 100));
+
+    var expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected);
+
+    Set<KeyExtent> newExtents = Set.of(ke1, ke2, ke3, ke4);
+
+    TabletMetadata tabletMeta = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tabletMeta.getFilesMap()).andReturn(tabletFiles).anyTimes();
+    EasyMock.replay(tabletMeta);
+
+    Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> results =
+        UpdateTablets.getNewTabletFiles(newExtents, tabletMeta, firstAndLastKeys::get);
+
+    assertEquals(expected.keySet(), results.keySet());
+    expected.forEach(((extent, files) -> {
+      assertEquals(files, results.get(extent));
+    }));
+
+    // Test a tablet with no files going to it
+
+    var tabletFiles2 = Map.of(file2, tabletFiles.get(file2), file4, tabletFiles.get(file4));
+    ke1Expected = Map.of(file2, new DataFileValue(1000, 100, 50));
+    ke2Expected = Map.of(file2, new DataFileValue(1000, 100, 50));
+    ke3Expected = Map.of(file4, new DataFileValue(4000, 400));
+    ke4Expected = Map.of();
+    expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected);
+
+    tabletMeta = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tabletMeta.getFilesMap()).andReturn(tabletFiles2).anyTimes();
+    EasyMock.replay(tabletMeta);
+
+    Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> results2 =
+        UpdateTablets.getNewTabletFiles(newExtents, tabletMeta, firstAndLastKeys::get);
+    assertEquals(expected.keySet(), results2.keySet());
+    expected.forEach(((extent, files) -> {
+      assertEquals(files, results2.get(extent));
+    }));
+
+  }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 3d269e85bf..c7d8723a10 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -77,7 +77,6 @@ import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -92,7 +91,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.volume.Volume;
-import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionStats;
 import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
@@ -1410,12 +1408,18 @@ public class Tablet extends TabletBase {
     }
   }
 
+  // TODO remove this hack that disables splits
+  private boolean getTrue() {
+    return true;
+  }
+
   /**
    * Returns true if this tablet needs to be split
    *
    */
   public synchronized boolean needsSplit(Optional<SplitComputations> splitComputations) {
-    if (isClosing() || isClosed()) {
+    // TODO remove this hack that disables splits
+    if (isClosing() || isClosed() || getTrue()) {
       return false;
     }
     return findSplitRow(splitComputations) != null;
@@ -1554,7 +1558,7 @@ public class Tablet extends TabletBase {
       KeyExtent low = new KeyExtent(extent.tableId(), midRow, extent.prevEndRow());
       KeyExtent high = new KeyExtent(extent.tableId(), extent.endRow(), midRow);
 
-      String lowDirectoryName = createTabletDirectoryName(context, midRow);
+      String lowDirectoryName = UniqueNameAllocator.createTabletDirectoryName(context, midRow);
 
       // write new tablet information to MetadataTable
       SortedMap<StoredTabletFile,DataFileValue> lowDatafileSizes = new TreeMap<>();
@@ -2131,15 +2135,6 @@ public class Tablet extends TabletBase {
     return timer.getTabletStats();
   }
 
-  private static String createTabletDirectoryName(ServerContext context, Text endRow) {
-    if (endRow == null) {
-      return ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
-    } else {
-      UniqueNameAllocator namer = context.getUniqueNameAllocator();
-      return Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
-    }
-  }
-
   public Set<Long> getBulkIngestedTxIds() {
     return bulkImported.keySet();
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
index 5263c888d1..fb2c28ae09 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -99,6 +99,7 @@ public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
 
       System.out.println("Number of generated files: " + stats.length);
       c.tableOperations().importDirectory(testDir.toString()).to(tableName).load();
+
       FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
       FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 13d9150d9c..f143c91480 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -18,7 +18,6 @@
  */
 package org.apache.accumulo.test.functional;
 
-import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -191,8 +190,9 @@ public class SplitIT extends AccumuloClusterHarness {
   public void deleteSplit() throws Exception {
     try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
       String tableName = getUniqueNames(1)[0];
-      c.tableOperations().create(tableName, new NewTableConfiguration()
-          .setProperties(singletonMap(Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K")));
+      c.tableOperations().create(tableName,
+          new NewTableConfiguration().setProperties(Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(),
+              "10K", Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K")));
       DeleteIT.deleteTest(c, getCluster(), tableName);
       c.tableOperations().flush(tableName, null, null, true);
       for (int i = 0; i < 5; i++) {