You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/07/05 19:03:37 UTC
[accumulo] branch elasticity updated: Removes external compaction final state metadata (#3565)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new a1f1fb85b2 Removes external compaction final state metadata (#3565)
a1f1fb85b2 is described below
commit a1f1fb85b2e8dd715b04f2676ad83ac035815de8
Author: Keith Turner <kt...@apache.org>
AuthorDate: Wed Jul 5 15:03:32 2023 -0400
Removes external compaction final state metadata (#3565)
Removes metadata related to how the external compaction commit
process used to work.
fixes #3465
---
.../accumulo/core/metadata/schema/Ample.java | 14 ---
.../schema/ExternalCompactionFinalState.java | 139 ---------------------
.../core/metadata/schema/MetadataSchema.java | 13 --
.../ClusterServerConfiguration.java | 2 +-
.../accumulo/server/metadata/ServerAmpleImpl.java | 52 --------
.../coordinator/DeadCompactionDetector.java | 2 +-
.../accumulo/manager/upgrade/Upgrader11to12.java | 70 ++++++++++-
.../accumulo/tserver/tablet/CompactableImpl.java | 8 +-
.../test/compaction/ExternalCompactionTServer.java | 47 -------
.../compaction/ExternalCompactionTestUtils.java | 27 ++--
.../test/compaction/ExternalCompaction_1_IT.java | 136 +++-----------------
.../test/compaction/ExternalCompaction_2_IT.java | 95 --------------
12 files changed, 111 insertions(+), 494 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 78a047dabe..082296446a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -214,20 +214,6 @@ public interface Ample {
throw new UnsupportedOperationException();
}
- default void
- putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> finalStates) {
- throw new UnsupportedOperationException();
- }
-
- default Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {
- throw new UnsupportedOperationException();
- }
-
- default void
- deleteExternalCompactionFinalStates(Collection<ExternalCompactionId> statusesToDelete) {
- throw new UnsupportedOperationException();
- }
-
/**
* Return an encoded delete marker Mutation to delete the specified TabletFile path. A
* ReferenceFile is used for the parameter because the Garbage Collector is optimized to store a
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
deleted file mode 100644
index 907982f2ed..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.metadata.schema;
-
-import static org.apache.accumulo.core.util.LazySingletons.GSON;
-
-import java.util.Base64;
-
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.hadoop.io.Text;
-
-import com.google.common.base.Preconditions;
-
-// ELASTICITY_TODO remove this class, remove it from ample, add upgrade code to remove it from metadata table
-public class ExternalCompactionFinalState {
-
- public enum FinalState {
- FINISHED, FAILED
- }
-
- private final ExternalCompactionId ecid;
- private final KeyExtent extent;
- private final FinalState state;
- private final long fileSize;
- private final long fileEntries;
-
- public ExternalCompactionFinalState(ExternalCompactionId ecid, KeyExtent extent, FinalState state,
- long fileSize, long fileEntries) {
- this.ecid = ecid;
- this.extent = extent;
- this.state = state;
- this.fileSize = fileSize;
- this.fileEntries = fileEntries;
- }
-
- public ExternalCompactionId getExternalCompactionId() {
- return ecid;
- }
-
- public FinalState getFinalState() {
- return state;
- }
-
- public KeyExtent getExtent() {
- return extent;
- }
-
- public long getFileSize() {
- Preconditions.checkState(state == FinalState.FINISHED);
- return fileSize;
- }
-
- public long getEntries() {
- Preconditions.checkState(state == FinalState.FINISHED);
- return fileEntries;
- }
-
- // This class is used to serialize and deserialize this class using GSon. Any changes to this
- // class must consider persisted data.
- private static class Extent {
-
- final String tableId;
- final String er;
- final String per;
-
- Extent(KeyExtent extent) {
- this.tableId = extent.tableId().canonical();
- if (extent.endRow() != null) {
- er = Base64.getEncoder().encodeToString(TextUtil.getBytes(extent.endRow()));
- } else {
- er = null;
- }
-
- if (extent.prevEndRow() != null) {
- per = Base64.getEncoder().encodeToString(TextUtil.getBytes(extent.prevEndRow()));
- } else {
- per = null;
- }
- }
-
- private Text decode(String s) {
- if (s == null) {
- return null;
- }
- return new Text(Base64.getDecoder().decode(s));
- }
-
- KeyExtent toKeyExtent() {
- return new KeyExtent(TableId.of(tableId), decode(er), decode(per));
- }
- }
-
- // This class is used to serialize and deserialize this class using GSon. Any changes to this
- // class must consider persisted data.
- private static class JsonData {
- Extent extent;
- String state;
- long fileSize;
- long entries;
- }
-
- public String toJson() {
- JsonData jd = new JsonData();
- jd.state = state.name();
- jd.fileSize = fileSize;
- jd.entries = fileEntries;
- jd.extent = new Extent(extent);
- return GSON.get().toJson(jd);
- }
-
- public static ExternalCompactionFinalState fromJson(ExternalCompactionId ecid, String json) {
- JsonData jd = GSON.get().fromJson(json, JsonData.class);
- return new ExternalCompactionFinalState(ecid, jd.extent.toKeyExtent(),
- FinalState.valueOf(jd.state), jd.fileSize, jd.entries);
- }
-
- @Override
- public String toString() {
- return toJson();
- }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index c42c088b6a..92725fabc0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -470,19 +470,6 @@ public class MetadataSchema {
}
- public static class ExternalCompactionSection {
- private static final Section section =
- new Section(RESERVED_PREFIX + "ecomp", true, RESERVED_PREFIX + "ecomq", false);
-
- public static Range getRange() {
- return section.getRange();
- }
-
- public static String getRowPrefix() {
- return section.getRowPrefix();
- }
- }
-
public static class ScanServerFileReferenceSection {
private static final Section section =
new Section(RESERVED_PREFIX + "sserv", true, RESERVED_PREFIX + "sserx", false);
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
index 05e37b5584..3a5d93e34a 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
@@ -96,7 +96,7 @@ public class ClusterServerConfiguration {
while (iter.hasNext()) {
String resourceGroup = iter.next();
if (!resourceGroup.equals(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME)) {
- compactors.remove(resourceGroup);
+ iter.remove();
}
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 25f4ff12eb..dd9ea397c4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -50,12 +50,9 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.ValidationUtil;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.AmpleImpl;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
import org.apache.accumulo.core.metadata.schema.TabletMutatorBase;
import org.apache.accumulo.core.security.Authorizations;
@@ -246,55 +243,6 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
return delFlag;
}
- @Override
- public void
- putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> finalStates) {
- try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) {
- String prefix = ExternalCompactionSection.getRowPrefix();
- for (ExternalCompactionFinalState finalState : finalStates) {
- Mutation m = new Mutation(prefix + finalState.getExternalCompactionId().canonical());
- m.put("", "", finalState.toJson());
- writer.addMutation(m);
- }
- } catch (MutationsRejectedException | TableNotFoundException e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {
- Scanner scanner;
- try {
- scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY);
- } catch (TableNotFoundException e) {
- throw new IllegalStateException(e);
- }
-
- scanner.setRange(ExternalCompactionSection.getRange());
- int pLen = ExternalCompactionSection.getRowPrefix().length();
- return scanner.stream()
- .map(e -> ExternalCompactionFinalState.fromJson(
- ExternalCompactionId.of(e.getKey().getRowData().toString().substring(pLen)),
- e.getValue().toString()));
- }
-
- @Override
- public void
- deleteExternalCompactionFinalStates(Collection<ExternalCompactionId> statusesToDelete) {
- try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) {
- String prefix = ExternalCompactionSection.getRowPrefix();
- for (ExternalCompactionId ecid : statusesToDelete) {
- Mutation m = new Mutation(prefix + ecid.canonical());
- m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
- writer.addMutation(m);
- }
- log.debug("Deleted external compaction final state entries for external compactions: {}",
- statusesToDelete);
- } catch (MutationsRejectedException | TableNotFoundException e) {
- throw new IllegalStateException(e);
- }
- }
-
@Override
public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) {
try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index b8bd5b4562..98540e1b93 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -98,7 +98,7 @@ public class DeadCompactionDetector {
running.forEach((ecid) -> {
if (tabletCompactions.remove(ecid) != null) {
- log.debug("Removed compaction {} running on a compactor", ecid);
+ log.debug("Ignoring compaction {} that is running on a compactor", ecid);
}
if (this.deadCompactions.remove(ecid) != null) {
log.debug("Removed {} from the dead compaction map, it's running on a compactor", ecid);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
index 1d5a56804d..f1d11712d0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
@@ -18,18 +18,30 @@
*/
package org.apache.accumulo.manager.upgrade;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX;
+
+import java.util.Map;
+
import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.schema.Section;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
public class Upgrader11to12 implements Upgrader {
private static final Logger LOG = LoggerFactory.getLogger(Upgrader11to12.class);
@@ -50,6 +62,33 @@ public class Upgrader11to12 implements Upgrader {
public void upgradeMetadata(ServerContext context) {
LOG.info("setting hosting goal on user tables");
addHostingGoalToUserTables(context);
+ deleteExternalCompactionFinalStates(context);
+ deleteExternalCompactions(context);
+ }
+
+ private void deleteExternalCompactionFinalStates(ServerContext context) {
+ // This metadata was only written for user tablets as part of the compaction commit process.
+ // Compactions are committed in a completely different way now, so delete these entries. Its
+ // possible some completed compactions may need to be redone, but processing these entries would
+ // not be easy to test so its better for correctness to delete them and redo the work.
+ try (var scanner = context.createScanner(MetadataTable.NAME);
+ var writer = context.createBatchWriter(MetadataTable.NAME)) {
+ var section = new Section(RESERVED_PREFIX + "ecomp", true, RESERVED_PREFIX + "ecomq", false);
+ scanner.setRange(section.getRange());
+
+ for (Map.Entry<Key,Value> entry : scanner) {
+ var key = entry.getKey();
+ var row = key.getRow();
+ Preconditions.checkState(row.toString().startsWith(section.getRowPrefix()));
+ Mutation m = new Mutation(row);
+ Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+ "Expected empty visibility, saw %s ", key.getColumnVisibilityData());
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+ writer.addMutation(m);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
}
private void addHostingGoalToSystemTable(ServerContext context, TableId tableId) {
@@ -57,7 +96,8 @@ public class Upgrader11to12 implements Upgrader {
TabletsMetadata tm =
context.getAmple().readTablets().forTable(tableId).fetch(ColumnType.PREV_ROW).build();
TabletsMutator mut = context.getAmple().mutateTablets()) {
- tm.forEach(t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ALWAYS));
+ tm.forEach(
+ t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ALWAYS).mutate());
}
}
@@ -74,8 +114,34 @@ public class Upgrader11to12 implements Upgrader {
TabletsMetadata tm = context.getAmple().readTablets().forLevel(DataLevel.USER)
.fetch(ColumnType.PREV_ROW).build();
TabletsMutator mut = context.getAmple().mutateTablets()) {
- tm.forEach(t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ONDEMAND));
+ tm.forEach(
+ t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ONDEMAND).mutate());
}
}
+ private void deleteExternalCompactions(ServerContext context) {
+ // External compactions were only written for user tablets in 3.x and earlier, so only need to
+ // process the metadata table. The metadata related to an external compaction has changed so
+ // delete any that exists. Not using Ample in case there are problems deserializing the old
+ // external compaction metadata.
+ try (var scanner = context.createScanner(MetadataTable.NAME);
+ var writer = context.createBatchWriter(MetadataTable.NAME)) {
+ scanner.setRange(TabletsSection.getRange());
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+
+ for (Map.Entry<Key,Value> entry : scanner) {
+ var key = entry.getKey();
+ Mutation m = new Mutation(key.getRow());
+ Preconditions.checkState(key.getColumnFamily().equals(ExternalCompactionColumnFamily.NAME),
+ "Expected family %s, saw %s ", ExternalCompactionColumnFamily.NAME,
+ key.getColumnVisibilityData());
+ Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+ "Expected empty visibility, saw %s ", key.getColumnVisibilityData());
+ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+ writer.addMutation(m);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index ec3ed5d71c..aafea306d1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -1410,7 +1410,9 @@ public class CompactableImpl implements Compactable {
extCompactionId);
}
- tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId));
+ throw new UnsupportedOperationException(
+ "This code no longer functions properly and needs to be removed");
+ // tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId));
} finally {
synchronized (this) {
Preconditions.checkState(externalCompactionsCommitting.remove(extCompactionId));
@@ -1445,7 +1447,9 @@ public class CompactableImpl implements Compactable {
log.debug("Ignoring request to fail external compaction that is unknown {}", ecid);
}
- tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid));
+ throw new UnsupportedOperationException(
+ "This code no longer functions properly and needs to be removed");
+ // tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid));
} finally {
synchronized (this) {
Preconditions.checkState(externalCompactionsCommitting.remove(ecid));
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java
deleted file mode 100644
index 63fa29e156..0000000000
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.compaction;
-
-import org.apache.accumulo.core.cli.ConfigOpts;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher;
-import org.apache.accumulo.tserver.TabletClientHandler;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.accumulo.tserver.WriteTracker;
-
-public class ExternalCompactionTServer extends TabletServer {
-
- ExternalCompactionTServer(ConfigOpts opts, String[] args) {
- super(opts, args);
- }
-
- @Override
- protected TabletClientHandler newTabletClientHandler(TransactionWatcher watcher,
- WriteTracker writeTracker) {
- return new NonCommittingExternalCompactionTabletClientHandler(this, watcher, writeTracker);
- }
-
- public static void main(String[] args) throws Exception {
- try (
- ExternalCompactionTServer tserver = new ExternalCompactionTServer(new ConfigOpts(), args)) {
- tserver.runServer();
- }
-
- }
-
-}
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index c0563c9187..26ad0c0f79 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test.compaction;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,9 +32,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.stream.Stream;
+import java.util.stream.Collectors;
-import org.apache.accumulo.cluster.AccumuloCluster;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -54,7 +54,6 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
@@ -68,6 +67,7 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.TestFilter;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
@@ -93,12 +93,6 @@ public class ExternalCompactionTestUtils {
return String.format("r:%04d", r);
}
- public static Stream<ExternalCompactionFinalState> getFinalStatesForTable(AccumuloCluster cluster,
- TableId tid) {
- return cluster.getServerContext().getAmple().getExternalCompactionFinalStates()
- .filter(state -> state.getExtent().tableId().equals(tid));
- }
-
public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
@@ -298,6 +292,21 @@ public class ExternalCompactionTestUtils {
return ecids;
}
+ public static void waitForRunningCompactions(ServerContext ctx, TableId tid,
+ Set<ExternalCompactionId> idsToWaitFor) throws Exception {
+
+ assertTrue(Wait.waitFor(() -> {
+ Set<ExternalCompactionId> seen;
+ try (TabletsMetadata tm =
+ ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.ECOMP).build()) {
+ seen = tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream())
+ .collect(Collectors.toSet());
+ }
+
+ return Collections.disjoint(seen, idsToWaitFor);
+ }));
+ }
+
public static int confirmCompactionRunning(ServerContext ctx, Set<ExternalCompactionId> ecids)
throws Exception {
int matches = 0;
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index bda4afdab7..3721960303 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.test.compaction;
-import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE2;
@@ -26,21 +25,17 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QU
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE4;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE5;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE6;
-import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE7;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE8;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
-import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -48,9 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
import org.apache.accumulo.core.client.Accumulo;
@@ -74,20 +67,10 @@ import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
-import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -202,23 +185,15 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase {
String table1 = this.getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
- // Stop the TabletServer so that it does not commit the compaction
- getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> {
- try {
- getCluster().killProcess(TABLET_SERVER, p);
- } catch (Exception e) {
- fail("Failed to shutdown tablet server");
- }
- });
- // Start our TServer that will not commit the compaction
- // ELASTICITY_TODO this will likely no longer work now that compactions do not run in the
- // tserver
- getCluster().getClusterControl().start(TABLET_SERVER, null, 1,
- ExternalCompactionTServer.class);
- getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
createTable(client, table1, "cs3", 2);
writeData(client, table1);
+ verify(client, table1, 1);
+
+ // ELASTICITY_TODO the compactors started by mini inspecting the config were interfering with
+ // starting the ExternalDoNothingCompactor, so killed all compactors. This is not the best way
+ // to handle this.
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE3, 1);
getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
@@ -228,19 +203,22 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase {
TableId tid = getCluster().getServerContext().getTableId(table1);
// Wait for the compaction to start by waiting for 1 external compaction column
- ExternalCompactionTestUtils
+ var ecids = ExternalCompactionTestUtils
.waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid);
+ assertFalse(ecids.isEmpty());
+
// Kill the compactor
getCluster().getClusterControl().stop(ServerType.COMPACTOR);
- // DeadCompactionDetector in the CompactionCoordinator should fail the compaction.
- long count = 0;
- while (count == 0) {
- count = getFinalStatesForTable(getCluster(), tid)
- .filter(state -> state.getFinalState().equals(FinalState.FAILED)).count();
- UtilWaitThread.sleep(250);
- }
+ // DeadCompactionDetector in the CompactionCoordinator should fail the compaction and delete
+ // it from the tablet.
+ ExternalCompactionTestUtils.waitForRunningCompactions(getCluster().getServerContext(), tid,
+ ecids);
+
+ // If the compaction actually ran it would have filtered data, so lets make sure all the data
+ // written is there. This check provides evidence the compaction did not run.
+ verify(client, table1, 1);
// We need to cancel the compaction or delete the table here because we initiate a user
// compaction above in the test. Even though the external compaction was cancelled
@@ -374,86 +352,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase {
}
}
- @Test
- public void testExternalCompactionDeadTServer() throws Exception {
- // Shut down the normal TServers
- getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> {
- try {
- getCluster().killProcess(TABLET_SERVER, p);
- } catch (Exception e) {
- fail("Failed to shutdown tablet server");
- }
- });
- // Start our TServer that will not commit the compaction
- ProcessInfo tserverProcess = getCluster().exec(ExternalCompactionTServer.class);
-
- final String table3 = this.getUniqueNames(1)[0];
-
- try (final AccumuloClient client =
- Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
- createTable(client, table3, "cs7");
- writeData(client, table3);
- getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE7, 1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR);
- compact(client, table3, 2, QUEUE7, false);
-
- // ExternalCompactionTServer will not commit the compaction. Wait for the
- // metadata table entries to show up.
- LOG.info("Waiting for external compaction to complete.");
- TableId tid = getCluster().getServerContext().getTableId(table3);
- Stream<ExternalCompactionFinalState> fs = getFinalStatesForTable(getCluster(), tid);
- while (fs.findAny().isEmpty()) {
- LOG.info("Waiting for compaction completed marker to appear");
- UtilWaitThread.sleep(250);
- fs = getFinalStatesForTable(getCluster(), tid);
- }
-
- LOG.info("Validating metadata table contents.");
- TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.ECOMP).build();
- List<TabletMetadata> md = new ArrayList<>();
- tm.forEach(t -> md.add(t));
- assertEquals(1, md.size());
- TabletMetadata m = md.get(0);
- Map<ExternalCompactionId,ExternalCompactionMetadata> em = m.getExternalCompactions();
- assertEquals(1, em.size());
- List<ExternalCompactionFinalState> finished = new ArrayList<>();
- getFinalStatesForTable(getCluster(), tid).forEach(f -> finished.add(f));
- assertEquals(1, finished.size());
- assertEquals(em.entrySet().iterator().next().getKey(),
- finished.get(0).getExternalCompactionId());
- tm.close();
-
- // Force a flush on the metadata table before killing our tserver
- client.tableOperations().flush(MetadataTable.NAME);
-
- // Stop our TabletServer. Need to perform a normal shutdown so that the WAL is closed
- // normally.
- LOG.info("Stopping our tablet server");
- getCluster().stopProcessWithTimeout(tserverProcess.getProcess(), 30, TimeUnit.SECONDS);
- getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
-
- // Start a TabletServer to commit the compaction.
- LOG.info("Starting normal tablet server");
- getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
-
- // Wait for the compaction to be committed.
- LOG.info("Waiting for compaction completed marker to disappear");
- Stream<ExternalCompactionFinalState> fs2 = getFinalStatesForTable(getCluster(), tid);
- while (fs2.findAny().isPresent()) {
- LOG.info("Waiting for compaction completed marker to disappear");
- UtilWaitThread.sleep(500);
- fs2 = getFinalStatesForTable(getCluster(), tid);
- }
- verify(client, table3, 2);
-
- // We need to cancel the compaction or delete the table here because we initiate a user
- // compaction above in the test. Even though the external compaction was cancelled
- // because we split the table, FaTE will continue to queue up a compaction
- client.tableOperations().cancelCompaction(table3);
- }
- }
-
public static class FSelector implements CompactionSelector {
@Override
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
index f946f1d901..969c164fd5 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
@@ -27,32 +27,26 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.co
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
-import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable;
-import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import java.util.Collections;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
-import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
@@ -140,95 +134,6 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase {
}
}
- @Test
- public void testExternalCompactionsSucceedsRunWithTableOffline() throws Exception {
-
- getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-
- String table1 = this.getUniqueNames(1)[0];
- try (AccumuloClient client =
- Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
-
- createTable(client, table1, "cs2");
- // set compaction ratio to 1 so that majc occurs naturally, not user compaction
- // user compaction blocks merge
- client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0");
- // cause multiple rfiles to be created
- writeData(client, table1);
- writeData(client, table1);
- writeData(client, table1);
- writeData(client, table1);
-
- TableId tid = getCluster().getServerContext().getTableId(table1);
- // Confirm that no final state is in the metadata table
- assertEquals(0, getFinalStatesForTable(getCluster(), tid).count());
-
- // Offline the table when the compaction starts
- final AtomicBoolean succeededInTakingOffline = new AtomicBoolean(false);
- Thread t = new Thread(() -> {
- try (AccumuloClient client2 =
- Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
- TExternalCompactionList metrics2 = getRunningCompactions(getCluster().getServerContext());
- while (metrics2.getCompactions() == null) {
- metrics2 = getRunningCompactions(getCluster().getServerContext());
- if (metrics2.getCompactions() == null) {
- UtilWaitThread.sleep(50);
- }
- }
- LOG.info("Taking table offline");
- client2.tableOperations().offline(table1, false);
- succeededInTakingOffline.set(true);
- } catch (Exception e) {
- LOG.error("Error: ", e);
- }
- });
- t.start();
-
- // Start the compactor
- getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 1);
- getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
- // Wait for the compaction to start by waiting for 1 external compaction column
- Set<ExternalCompactionId> ecids = ExternalCompactionTestUtils
- .waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid);
-
- // Confirm that this ECID shows up in RUNNING set
- int matches = ExternalCompactionTestUtils
- .confirmCompactionRunning(getCluster().getServerContext(), ecids);
- assertTrue(matches > 0);
-
- t.join();
- if (!succeededInTakingOffline.get()) {
- fail("Failed to offline table");
- }
-
- confirmCompactionCompleted(getCluster().getServerContext(), ecids,
- TCompactionState.SUCCEEDED);
-
- // Confirm that final state is in the metadata table
- assertEquals(1, getFinalStatesForTable(getCluster(), tid).count());
-
- // Online the table
- client.tableOperations().online(table1);
-
- // wait for compaction to be committed by tserver or test timeout
- long finalStateCount = getFinalStatesForTable(getCluster(), tid).count();
- while (finalStateCount > 0) {
- finalStateCount = getFinalStatesForTable(getCluster(), tid).count();
- if (finalStateCount > 0) {
- UtilWaitThread.sleep(50);
- }
- }
-
- // We need to cancel the compaction or delete the table here because we initiate a user
- // compaction above in the test. Even though the external compaction was cancelled
- // because we split the table, FaTE will continue to queue up a compaction
- client.tableOperations().delete(table1);
-
- getCluster().getClusterControl().stop(ServerType.COMPACTOR);
- }
- }
-
@Test
public void testUserCompactionCancellation() throws Exception {