You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/05/25 18:27:35 UTC
[accumulo] branch elasticity updated: executes user initiated splits in manager (#3425)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new c08139caf5 executes user initiated splits in manager (#3425)
c08139caf5 is described below
commit c08139caf511dee6b7166c781a6169aa420dbdc4
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu May 25 14:27:29 2023 -0400
executes user initiated splits in manager (#3425)
Modifies the user API for adding splits to execute a fate operation
instead of calling the tablet server. Now user initiated splits can
happen without having having to host a tablet.
---
.../core/clientImpl/TableOperationsImpl.java | 274 ++++++++-------------
.../core/manager/state/TabletManagement.java | 9 +-
.../core/metadata/schema/TabletMetadata.java | 7 +-
.../core/clientImpl/thrift/TableOperation.java | 5 +-
.../core/manager/thrift/FateOperation.java | 5 +-
core/src/main/thrift/client.thrift | 1 +
core/src/main/thrift/manager.thrift | 1 +
.../manager/state/TabletManagementIterator.java | 17 +-
.../accumulo/manager/FateServiceHandler.java | 53 ++++
.../java/org/apache/accumulo/manager/Manager.java | 4 +
.../manager/tableOps/split/DeleteOperationIds.java | 6 +
.../accumulo/manager/tableOps/split/PreSplit.java | 2 +-
.../test/functional/ManagerAssignmentIT.java | 61 ++++-
.../functional/TabletManagementIteratorIT.java | 28 ++-
14 files changed, 285 insertions(+), 188 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 37dbe8488a..a063cff241 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -59,10 +59,11 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -140,8 +141,6 @@ import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.summary.SummarizerConfigurationUtil;
import org.apache.accumulo.core.summary.SummaryCollection;
-import org.apache.accumulo.core.tablet.thrift.TabletManagementClientService;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
@@ -155,7 +154,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.transport.TTransportException;
@@ -164,7 +162,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.net.HostAndPort;
public class TableOperationsImpl extends TableOperationsHelper {
@@ -435,198 +432,143 @@ public class TableOperationsImpl extends TableOperationsHelper {
}
}
- private static class SplitEnv {
- private final String tableName;
- private final TableId tableId;
- private final ExecutorService executor;
- private final CountDownLatch latch;
- private final AtomicReference<Exception> exception;
-
- SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch,
- AtomicReference<Exception> exception) {
- this.tableName = tableName;
- this.tableId = tableId;
- this.executor = executor;
- this.latch = latch;
- this.exception = exception;
- }
- }
-
- private class SplitTask implements Runnable {
-
- private List<Text> splits;
- private SplitEnv env;
-
- SplitTask(SplitEnv env, List<Text> splits) {
- this.env = env;
- this.splits = splits;
- }
+ /**
+ * On the server side the fate operation will exit w/o an error if the tablet requested to split
+ * does not exist. When this happens it will also return an empty string. In the case where the
+ * fate operation successfully splits the tablet it will return the following string. This code
+ * uses this return value to see if it needs to retry finding the tablet.
+ */
+ public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
- @Override
- public void run() {
- try {
- if (env.exception.get() != null) {
- return;
- }
+ @Override
+ public void addSplits(String tableName, SortedSet<Text> splits)
+ throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
- if (splits.size() <= 2) {
- addSplits(env, new TreeSet<>(splits));
- splits.forEach(s -> env.latch.countDown());
- return;
- }
+ EXISTING_TABLE_NAME.validate(tableName);
- int mid = splits.size() / 2;
+ TableId tableId = context.getTableId(tableName);
- // split the middle split point to ensure that child task split
- // different tablets and can therefore run in parallel
- addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
- env.latch.countDown();
+ // TODO should there be a server side check for this?
+ context.requireNotOffline(tableId, tableName);
- env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
- env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size())));
+ ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId);
- } catch (Exception t) {
- env.exception.compareAndSet(null, t);
- }
- }
+ SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+ ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
+ try {
+ while (!splitsTodo.isEmpty()) {
- }
+ tabLocator.invalidateCache();
- @Override
- public void addSplits(String tableName, SortedSet<Text> partitionKeys)
- throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- EXISTING_TABLE_NAME.validate(tableName);
+ Map<KeyExtent,List<Text>> tabletSplits =
+ mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
- TableId tableId = context.getTableId(tableName);
- List<Text> splits = new ArrayList<>(partitionKeys);
+ List<Future<List<Text>>> splitTasks = new ArrayList<>();
- // should be sorted because we copied from a sorted set, but that makes
- // assumptions about how the copy was done so resort to be sure.
- Collections.sort(splits);
- CountDownLatch latch = new CountDownLatch(splits.size());
- AtomicReference<Exception> exception = new AtomicReference<>(null);
+ for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) {
+ Callable<List<Text>> splitTask = createSplitTask(tableName, splitsForTablet);
+ splitTasks.add(executor.submit(splitTask));
+ }
- ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
- try {
- executor.execute(
- new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
-
- while (!latch.await(100, MILLISECONDS)) {
- if (exception.get() != null) {
- executor.shutdownNow();
- Throwable excep = exception.get();
- // Below all exceptions are wrapped and rethrown. This is done so that the user knows what
- // code path got them here. If the wrapping was not done, the
- // user would only have the stack trace for the background thread.
- if (excep instanceof TableNotFoundException) {
- TableNotFoundException tnfe = (TableNotFoundException) excep;
- throw new TableNotFoundException(tableId.canonical(), tableName,
- "Table not found by background thread", tnfe);
- } else if (excep instanceof TableOfflineException) {
- log.debug("TableOfflineException occurred in background thread. Throwing new exception",
- excep);
- throw new TableOfflineException(tableId, tableName);
- } else if (excep instanceof AccumuloSecurityException) {
- // base == background accumulo security exception
- AccumuloSecurityException base = (AccumuloSecurityException) excep;
- throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
- base.getTableInfo(), excep);
- } else if (excep instanceof AccumuloServerException) {
- throw new AccumuloServerException((AccumuloServerException) excep);
- } else if (excep instanceof Error) {
- throw new Error(excep);
- } else {
- throw new AccumuloException(excep);
+ for (var future : splitTasks) {
+ try {
+ var completedSplits = future.get();
+ completedSplits.forEach(splitsTodo::remove);
+ } catch (ExecutionException ee) {
+ Throwable excep = ee.getCause();
+ // Below all exceptions are wrapped and rethrown. This is done so that the user knows
+ // what
+ // code path got them here. If the wrapping was not done, the user would only have the
+ // stack trace for the background thread.
+ if (excep instanceof TableNotFoundException) {
+ TableNotFoundException tnfe = (TableNotFoundException) excep;
+ throw new TableNotFoundException(tableId.canonical(), tableName,
+ "Table not found by background thread", tnfe);
+ } else if (excep instanceof TableOfflineException) {
+ log.debug(
+ "TableOfflineException occurred in background thread. Throwing new exception",
+ excep);
+ throw new TableOfflineException(tableId, tableName);
+ } else if (excep instanceof AccumuloSecurityException) {
+ // base == background accumulo security exception
+ AccumuloSecurityException base = (AccumuloSecurityException) excep;
+ throw new AccumuloSecurityException(base.getUser(),
+ base.asThriftException().getCode(), base.getTableInfo(), excep);
+ } else if (excep instanceof AccumuloServerException) {
+ throw new AccumuloServerException((AccumuloServerException) excep);
+ } else {
+ throw new AccumuloException(excep);
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
}
}
}
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
} finally {
- executor.shutdown();
+ executor.shutdownNow();
}
}
- private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys)
- throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
- AccumuloServerException, InvalidTabletHostingRequestException {
-
- ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, env.tableId);
- for (Text split : partitionKeys) {
- boolean successful = false;
- int attempt = 0;
- long locationFailures = 0;
+ private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,
+ ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
+ throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
- while (!successful) {
+ var iterator = splitsTodo.iterator();
+ while (iterator.hasNext()) {
+ var split = iterator.next();
- if (attempt > 0) {
- sleepUninterruptibly(100, MILLISECONDS);
+ try {
+ var tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED);
+ if (tablet == null) {
+ context.requireTableExists(tableId, tableName);
+ throw new IllegalStateException("Unable to find a tablet for split " + split
+ + " in table " + tableName + " " + tableId);
}
- attempt++;
-
- CachedTablet tl = tabLocator.findTablet(context, split, false, LocationNeed.REQUIRED);
-
- if (tl == null) {
- context.requireTableExists(env.tableId, env.tableName);
- context.requireNotOffline(env.tableId, env.tableName);
+ if (split.equals(tablet.getExtent().endRow())) {
+ // split already exists, so remove it
+ iterator.remove();
continue;
}
- HostAndPort address = HostAndPort.fromString(tl.getTserverLocation().orElseThrow());
-
- try {
- TabletManagementClientService.Client client =
- ThriftUtil.getClient(ThriftClientTypes.TABLET_MGMT, address, context);
- try {
-
- OpTimer timer = null;
-
- if (log.isTraceEnabled()) {
- log.trace("tid={} Splitting tablet {} on {} at {}", Thread.currentThread().getId(),
- tl.getExtent(), address, split);
- timer = new OpTimer().start();
- }
-
- client.splitTablet(TraceUtil.traceInfo(), context.rpcCreds(), tl.getExtent().toThrift(),
- TextUtil.getByteBuffer(split));
+ tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>()).add(split);
- // just split it, might as well invalidate it in the cache
- tabLocator.invalidateCache(tl.getExtent());
+ } catch (InvalidTabletHostingRequestException e) {
+ // not expected
+ throw new AccumuloException(e);
+ }
+ }
+ return tabletSplits;
+ }
- if (timer != null) {
- timer.stop();
- log.trace("Split tablet in {}", String.format("%.3f secs", timer.scale(SECONDS)));
- }
+ private Callable<List<Text>> createSplitTask(String tableName,
+ Entry<KeyExtent,List<Text>> splitsForTablet) {
+ Callable<List<Text>> splitTask = () -> {
+ var extent = splitsForTablet.getKey();
- } finally {
- ThriftUtil.returnClient(client, context);
- }
+ ByteBuffer EMPTY = ByteBuffer.allocate(0);
- } catch (TApplicationException tae) {
- throw new AccumuloServerException(address.toString(), tae);
- } catch (ThriftSecurityException e) {
- context.clearTableListCache();
- context.requireTableExists(env.tableId, env.tableName);
- throw new AccumuloSecurityException(e.user, e.code, e);
- } catch (NotServingTabletException e) {
- // Do not silently spin when we repeatedly fail to get the location for a tablet
- locationFailures++;
- if (locationFailures == 5 || locationFailures % 50 == 0) {
- log.warn("Having difficulty locating hosting tabletserver for split {} on table {}."
- + " Seen {} failures.", split, env.tableName, locationFailures);
- }
+ List<ByteBuffer> args = new ArrayList<>();
+ args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
+ args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow()));
+ args.add(extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow()));
+ splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split)));
- tabLocator.invalidateCache(tl.getExtent());
- continue;
- } catch (TException e) {
- tabLocator.invalidateCache(context, tl.getTserverLocation().orElseThrow());
- continue;
+ try {
+ String status = doFateOperation(FateOperation.TABLE_SPLIT, args, Map.of(), tableName);
+ if (SPLIT_SUCCESS_MSG.equals(status)) {
+ // the fate operation successfully created the splits, so these splits are done
+ return splitsForTablet.getValue();
+ } else {
+ // splits did not succeed
+ return List.of();
}
-
- successful = true;
+ } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException e) {
+ throw new RuntimeException(e);
}
- }
+ };
+ return splitTask;
}
@Override
diff --git a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
index a9233819d6..73d7971898 100644
--- a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
+++ b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
@@ -41,14 +41,15 @@ import com.google.common.base.Splitter;
*/
public class TabletManagement {
- public static final EnumSet<ColumnType> CONFIGURED_COLUMNS = EnumSet.of(ColumnType.PREV_ROW,
- ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS, ColumnType.CHOPPED,
- ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, ColumnType.FILES, ColumnType.LAST);
+ public static final EnumSet<ColumnType> CONFIGURED_COLUMNS =
+ EnumSet.of(ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS,
+ ColumnType.CHOPPED, ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED,
+ ColumnType.FILES, ColumnType.LAST, ColumnType.OPID);
private static final String REASONS_COLUMN_NAME = "REASONS";
public static enum ManagementAction {
- BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, IS_MERGING, IS_MIGRATING, NEEDS_SPLITTING;
+ BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, IS_MERGING, NEEDS_SPLITTING;
}
public static void addActions(final SortedMap<Key,Value> decodedRow,
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index 829ea52c02..7e043c258a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -76,7 +76,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Se
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -559,11 +558,7 @@ public class TabletMetadata {
case HostingColumnFamily.STR_NAME:
switch (qual) {
case GOAL_QUAL:
- if (StringUtils.isEmpty(kv.getValue().toString())) {
- te.goal = TabletHostingGoal.ONDEMAND;
- } else {
- te.goal = TabletHostingGoalUtil.fromValue(kv.getValue());
- }
+ te.goal = TabletHostingGoalUtil.fromValue(kv.getValue());
break;
case REQUESTED_QUAL:
te.onDemandHostingRequested = true;
diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java
index bc2deb978b..673a1dec18 100644
--- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java
+++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java
@@ -43,7 +43,8 @@ public enum TableOperation implements org.apache.thrift.TEnum {
IMPORT(14),
EXPORT(15),
COMPACT_CANCEL(16),
- SET_HOSTING_GOAL(17);
+ SET_HOSTING_GOAL(17),
+ SPLIT(18);
private final int value;
@@ -102,6 +103,8 @@ public enum TableOperation implements org.apache.thrift.TEnum {
return COMPACT_CANCEL;
case 17:
return SET_HOSTING_GOAL;
+ case 18:
+ return SPLIT;
default:
return null;
}
diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java
index d93a7eafaa..29c0cbe578 100644
--- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java
+++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java
@@ -43,7 +43,8 @@ public enum FateOperation implements org.apache.thrift.TEnum {
NAMESPACE_DELETE(14),
NAMESPACE_RENAME(15),
TABLE_BULK_IMPORT2(16),
- TABLE_HOSTING_GOAL(17);
+ TABLE_HOSTING_GOAL(17),
+ TABLE_SPLIT(18);
private final int value;
@@ -102,6 +103,8 @@ public enum FateOperation implements org.apache.thrift.TEnum {
return TABLE_BULK_IMPORT2;
case 17:
return TABLE_HOSTING_GOAL;
+ case 18:
+ return TABLE_SPLIT;
default:
return null;
}
diff --git a/core/src/main/thrift/client.thrift b/core/src/main/thrift/client.thrift
index 9c430d4001..700ca7a6db 100644
--- a/core/src/main/thrift/client.thrift
+++ b/core/src/main/thrift/client.thrift
@@ -40,6 +40,7 @@ enum TableOperation {
EXPORT
COMPACT_CANCEL
SET_HOSTING_GOAL
+ SPLIT
}
enum TableOperationExceptionType {
diff --git a/core/src/main/thrift/manager.thrift b/core/src/main/thrift/manager.thrift
index 3e97977258..51738e7094 100644
--- a/core/src/main/thrift/manager.thrift
+++ b/core/src/main/thrift/manager.thrift
@@ -68,6 +68,7 @@ enum FateOperation {
NAMESPACE_RENAME
TABLE_BULK_IMPORT2
TABLE_HOSTING_GOAL
+ TABLE_SPLIT
}
enum ManagerState {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index fb393ce25a..0556e68726 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -57,6 +57,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Fu
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -217,10 +218,16 @@ public class TabletManagementIterator extends SkippingIterator {
.collect(Collectors.summarizingLong(Long::longValue)).getSum() > splitThreshold;
}
- private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+ private boolean shouldReturnDueToLocation(final TabletMetadata tm,
final Set<TableId> onlineTables, final Set<TServerInstance> current, final boolean debug) {
+
+ if (migrations.contains(tm.getExtent())) {
+ return true;
+ }
+
// is the table supposed to be online or offline?
- final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+ final boolean shouldBeOnline =
+ onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null;
TabletState state = tm.getTabletState(current);
if (debug) {
@@ -262,6 +269,7 @@ public class TabletManagementIterator extends SkippingIterator {
scanner.fetchColumnFamily(LogColumnFamily.NAME);
scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
scanner.fetchColumnFamily(HostingColumnFamily.NAME);
+ ServerColumnFamily.OPID_COLUMN.fetch(scanner);
scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
IteratorSetting tabletChange =
new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class);
@@ -393,11 +401,6 @@ public class TabletManagementIterator extends SkippingIterator {
reasonsToReturnThisTablet.add(ManagementAction.IS_MERGING);
}
- // always return the information for migrating tablets
- if (migrations.contains(tm.getExtent())) {
- reasonsToReturnThisTablet.add(ManagementAction.IS_MIGRATING);
- }
-
if (shouldReturnDueToLocation(tm, onlineTables, current, debug)) {
reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
index 785c056874..9cff4251dc 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
@@ -41,6 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -63,6 +65,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
@@ -89,6 +92,7 @@ import org.apache.accumulo.manager.tableOps.namespace.create.CreateNamespace;
import org.apache.accumulo.manager.tableOps.namespace.delete.DeleteNamespace;
import org.apache.accumulo.manager.tableOps.namespace.rename.RenameNamespace;
import org.apache.accumulo.manager.tableOps.rename.RenameTable;
+import org.apache.accumulo.manager.tableOps.split.PreSplit;
import org.apache.accumulo.manager.tableOps.tableExport.ExportTable;
import org.apache.accumulo.manager.tableOps.tableImport.ImportTable;
import org.apache.accumulo.server.client.ClientServiceHandler;
@@ -694,6 +698,55 @@ class FateServiceHandler implements FateService.Iface {
goalMessage);
break;
}
+ case TABLE_SPLIT: {
+ TableOperation tableOp = TableOperation.SPLIT;
+
+ // ELASTICITY_TODO this does not check if table is offline for now, that is usually done in
+ // FATE operation with a table lock. Deferring that check for now as its possible tablet
+ // locks may not be needed.
+
+ int SPLIT_OFFSET = 3; // offset where split data begins in arguments list
+ if (arguments.size() < (SPLIT_OFFSET + 1)) {
+ throw new ThriftTableOperationException(null, null, tableOp,
+ TableOperationExceptionType.OTHER,
+ "Expected at least " + (SPLIT_OFFSET + 1) + " arguments, saw :" + arguments.size());
+ }
+
+ var tableId = validateTableIdArgument(arguments.get(0), tableOp, NOT_ROOT_TABLE_ID);
+ NamespaceId namespaceId = getNamespaceIdFromTableId(tableOp, tableId);
+
+ boolean canSplit;
+
+ try {
+ canSplit = manager.security.canSplitTablet(c, tableId, namespaceId);
+ } catch (ThriftSecurityException e) {
+ throwIfTableMissingSecurityException(e, tableId, null, TableOperation.SPLIT);
+ throw e;
+ }
+
+ if (!canSplit) {
+ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ }
+
+ var endRow = ByteBufferUtil.toText(arguments.get(1));
+ var prevEndRow = ByteBufferUtil.toText(arguments.get(2));
+
+ endRow = endRow.getLength() == 0 ? null : endRow;
+ prevEndRow = prevEndRow.getLength() == 0 ? null : prevEndRow;
+
+ // ELASTICITY_TODO create table stores splits in a file, maybe this operation should do the
+ // same
+ SortedSet<Text> splits = arguments.subList(SPLIT_OFFSET, arguments.size()).stream()
+ .map(ByteBufferUtil::toText).collect(Collectors.toCollection(TreeSet::new));
+
+ KeyExtent extent = new KeyExtent(tableId, endRow, prevEndRow);
+ manager.requestUnassignment(extent, opid);
+
+ goalMessage = "Splitting " + extent + " for user into " + (splits.size() + 1) + " tablets";
+ manager.fate().seedTransaction(op.toString(), opid, new PreSplit(extent, splits),
+ autoCleanup, goalMessage);
+ break;
+ }
default:
throw new UnsupportedOperationException();
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index ee84a35273..401d73d91d 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -706,6 +706,10 @@ public class Manager extends AbstractServer
return TabletGoalState.UNASSIGNED;
}
+ if (tm.getOperationId() != null) {
+ return TabletGoalState.UNASSIGNED;
+ }
+
if (tm.hasCurrent() && serversToShutdown.contains(tm.getLocation().getServerInstance())) {
return TabletGoalState.SUSPENDED;
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
index 6528b7e21c..6ccba834fa 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.split;
import java.util.stream.Collectors;
+import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
@@ -68,4 +69,9 @@ public class DeleteOperationIds extends ManagerRepo {
return null;
}
+
+ @Override
+ public String getReturn() {
+ return TableOperationsImpl.SPLIT_SUCCESS_MSG;
+ }
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
index ae0bfae78a..592f552692 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
@@ -121,7 +121,7 @@ public class PreSplit extends ManagerRepo {
var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid);
- if (tabletMetadata == null || !tabletMetadata.getOperationId().equals(opid)) {
+ if (tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId())) {
// the tablet no longer exists or we could not set the operation id, maybe another operation
// was running, lets not proceed with the split.
var optMeta = Optional.ofNullable(tabletMetadata);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
index 1b0bba78fa..93626de958 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
@@ -28,8 +28,10 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.accumulo.core.client.Accumulo;
@@ -50,13 +52,19 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.manager.state.TabletManagement;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
@@ -192,7 +200,6 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase {
assertNull(ondemand.getLocation());
assertEquals(flushed.getLocation().getHostPort(), ondemand.getLast().getHostPort());
assertEquals(TabletHostingGoal.ONDEMAND, ondemand.getHostingGoal());
-
}
}
@@ -357,6 +364,49 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase {
}
}
+ @Test
+ public void testOpidPreventsAssignment() throws Exception {
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = super.getUniqueNames(1)[0];
+
+ var tableId = TableId.of(prepTableForScanTest(c, tableName));
+ assertEquals(0, countTabletsWithLocation(c, tableId));
+
+ assertEquals(Set.of("f", "m", "t"), c.tableOperations().listSplits(tableName).stream()
+ .map(Text::toString).collect(Collectors.toSet()));
+
+ c.securityOperations().grantTablePermission(getPrincipal(), MetadataTable.NAME,
+ TablePermission.WRITE);
+
+ try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
+ var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L);
+ Mutation m = new Mutation(extent.toMetaRow());
+ TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical()));
+ writer.addMutation(m);
+ }
+
+ c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS);
+
+ Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 3);
+
+ // there are four tablets, but one has an operation id set and should not be assigned
+ assertEquals(3, countTabletsWithLocation(c, tableId));
+
+ try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
+ var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
+ Mutation m = new Mutation(extent.toMetaRow());
+ TabletsSection.ServerColumnFamily.OPID_COLUMN.putDelete(m);
+ writer.addMutation(m);
+ }
+
+ Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 4);
+
+ // after the operation id is deleted the tablet should be assigned
+ assertEquals(4, countTabletsWithLocation(c, tableId));
+ }
+ }
+
public static void loadDataForScan(AccumuloClient c, String tableName)
throws MutationsRejectedException, TableNotFoundException {
final byte[] empty = new byte[0];
@@ -373,6 +423,15 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase {
}
}
+ public static Ample getAmple(AccumuloClient c) {
+ return ((ClientContext) c).getAmple();
+ }
+
+ public static long countTabletsWithLocation(AccumuloClient c, TableId tableId) {
+ return getAmple(c).readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.LOCATION)
+ .build().stream().filter(tabletMetadata -> tabletMetadata.getLocation() != null).count();
+ }
+
public static List<TabletStats> getTabletStats(AccumuloClient c, String tableId)
throws AccumuloException, AccumuloSecurityException {
return ThriftClientTypes.TABLET_SERVER.execute((ClientContext) c, client -> client
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index a62100715b..1b3a14df5e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -62,8 +62,11 @@ import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.ManagerState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -148,10 +151,16 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness {
assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, state),
"Should have two tablets without a loc");
+ // Test setting the operation id on one of the tablets in table t1. Table t1 has two tablets
+ // w/o a location. Only one should need attention because of the operation id.
+ setOperationId(client, metaCopy1, t1);
+ assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, state),
+ "Should have not tablets needing attention because of operation id");
+
// test the cases where the assignment is to a dead tserver
reassignLocation(client, metaCopy2, t3);
assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, state),
- "Should have one tablet that needs to be unassigned");
+ "Only 1 of 2 tablets in table t1 should be returned");
// test the cases where there is ongoing merges
state = new State(client) {
@@ -225,6 +234,23 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness {
}
}
+ private void setOperationId(AccumuloClient client, String table, String tableNameToModify)
+ throws TableNotFoundException, MutationsRejectedException {
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L);
+ TableId tableIdToModify =
+ TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
+ try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
+ scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange());
+ Entry<Key,Value> entry = scanner.iterator().next();
+ Mutation m = new Mutation(entry.getKey().getRow());
+ MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m,
+ new Value(opid.canonical()));
+ try (BatchWriter bw = client.createBatchWriter(table)) {
+ bw.addMutation(m);
+ }
+ }
+ }
+
private void removeLocation(AccumuloClient client, String table, String tableNameToModify)
throws TableNotFoundException, MutationsRejectedException {
TableId tableIdToModify =