You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/04/27 15:34:36 UTC
[accumulo] branch 1451-external-compactions-feature updated: ref
#2011 - added ITs for splits and merges during external compaction
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new 209ac74 ref #2011 - added ITs for splits and merges during external compaction
209ac74 is described below
commit 209ac74f796cb8641c4733fe477c4947fd076816
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Apr 27 15:33:47 2021 +0000
ref #2011 - added ITs for splits and merges during external compaction
---
.../coordinator/CompactionCoordinator.java | 7 +-
.../coordinator/ExternalCompactionMetrics.java | 12 +-
.../accumulo/coordinator/QueueSummaries.java | 2 +-
.../apache/accumulo/test/ExternalCompactionIT.java | 175 ++++++++++++++++++---
4 files changed, 165 insertions(+), 31 deletions(-)
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index f8c9d65..1107e4d 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -258,7 +258,8 @@ public class CompactionCoordinator extends AbstractServer
response.setStatus(200);
response.setContentType("application/json");
metrics.setRunning(RUNNING.size());
- response.getWriter().print(metrics.toJson(GSON));
+ LOG.debug("Returning metrics: {}", metrics);
+ response.getWriter().print(GSON.toJson(metrics));
}
});
handlers.addHandler(metricContext);
@@ -700,6 +701,7 @@ public class CompactionCoordinator extends AbstractServer
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
LOG.info("Compaction completed, id: {}, stats: {}", externalCompactionId, stats);
+ metrics.incrementCompleted();
final var ecid = ExternalCompactionId.of(externalCompactionId);
final RunningCompaction rc = RUNNING.get(ecid);
if (null != rc) {
@@ -708,7 +710,6 @@ public class CompactionCoordinator extends AbstractServer
rc.setCompleted();
compactionFinalizer.commitCompaction(ecid, KeyExtent.fromThrift(textent), stats.fileSize,
stats.entriesWritten);
- metrics.incrementCompleted();
} else {
LOG.error(
"Compaction completed called by Compactor for {}, but no running compaction for that id.",
@@ -726,13 +727,13 @@ public class CompactionCoordinator extends AbstractServer
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
LOG.info("Compaction failed, id: {}", externalCompactionId);
+ metrics.incrementFailed();
final var ecid = ExternalCompactionId.of(externalCompactionId);
final RunningCompaction rc = RUNNING.get(ecid);
if (null != rc) {
// CBUG: Should we remove rc from RUNNING here and remove the isCompactionCompleted method?
rc.setCompleted();
compactionFinalizer.failCompactions(Map.of(ecid, KeyExtent.fromThrift(extent)));
- metrics.incrementFailed();
} else {
LOG.error(
"Compaction failed called by Compactor for {}, but no running compaction for that id.",
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
index 2ad35f4..5c72cdc 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
@@ -18,8 +18,6 @@
*/
package org.apache.accumulo.coordinator;
-import com.google.gson.Gson;
-
public class ExternalCompactionMetrics {
private long started = 0;
@@ -71,17 +69,13 @@ public class ExternalCompactionMetrics {
this.failed++;
}
- public String toJson(Gson gson) {
- return gson.toJson(this);
- }
-
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("started: ").append(started);
- buf.append("running: ").append(running);
- buf.append("completed: ").append(completed);
- buf.append("failed: ").append(failed);
+ buf.append(", running: ").append(running);
+ buf.append(", completed: ").append(completed);
+ buf.append(", failed: ").append(failed);
return buf.toString();
}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
index 8cfe294..103ffc4 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
@@ -37,7 +37,7 @@ import com.google.common.collect.Sets;
public class QueueSummaries {
- // keep track of the last tserver retunred for qeueue
+ // keep track of the last tserver returned for queue
final Map<String,PrioTserver> LAST = new HashMap<>();
/* Map of external queue name -> priority -> tservers */
diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index a2b7a3c..4e7faa8 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -23,6 +23,8 @@ import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
+import java.net.http.HttpClient.Redirect;
+import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
@@ -50,9 +52,11 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -173,6 +177,139 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
}
@Test
+ public void testSplitDuringExternalCompaction() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+ String table1 = "ectt6";
+ createTable(client, table1, "cs1");
+ TableId tid = Tables.getTableId(getCluster().getServerContext(), table1);
+ writeData(client, table1);
+
+ cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+ cluster.exec(CompactionCoordinator.class);
+ compact(client, table1, 2, "DCQ1", false);
+
+ // Wait for the compaction to start by waiting for 1 external compaction column
+ List<TabletMetadata> md = new ArrayList<>();
+ TabletsMetadata tm = null;
+ do {
+ if (null != tm) {
+ tm.close();
+ }
+ tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+ .fetch(ColumnType.ECOMP).build();
+ tm.forEach(t -> md.add(t));
+ } while (md.size() == 0);
+ tm.close();
+ md.clear();
+
+ // ExternalDoNothingCompactor will not compact, it will wait, split the table.
+ SortedSet<Text> splits = new TreeSet<>();
+ int jump = MAX_DATA / 5;
+ for (int r = jump; r < MAX_DATA; r += jump) {
+ splits.add(new Text(row(r)));
+ }
+ client.tableOperations().addSplits(table1, splits);
+
+ // Wait for the compaction to get cancelled
+ UtilWaitThread.sleep(5000);
+
+ // Wait for the table to split by waiting for 5 tablets to show up in the metadata table
+ do {
+ if (null != tm) {
+ tm.close();
+ }
+ tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+ .fetch(ColumnType.PREV_ROW).build();
+ tm.forEach(t -> md.add(t));
+ } while (md.size() < 5);
+ tm.close();
+
+ // Check that there is one failed compaction in the coordinator metrics
+ ExternalCompactionMetrics metrics = getCoordinatorMetrics();
+ Assert.assertEquals(1, metrics.getStarted());
+ Assert.assertEquals(1, metrics.getRunning()); // CBUG: Should be zero when #2032 is resolved
+ Assert.assertEquals(0, metrics.getCompleted());
+ Assert.assertEquals(1, metrics.getFailed());
+
+ }
+
+ }
+
+ @Test
+ public void testMergeDuringExternalCompaction() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+ String table1 = "ectt7";
+ SortedSet<Text> splits = new TreeSet<>();
+ int jump = MAX_DATA / 2;
+ for (int r = jump; r < MAX_DATA; r += jump) {
+ splits.add(new Text(row(r)));
+ }
+ createTable(client, table1, "cs1", splits);
+ // set compaction ratio to 1 so that majc occurs naturally, not user compaction
+ // user compaction blocks merge
+ client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0");
+ // cause multiple rfiles to be created
+ writeData(client, table1);
+ writeData(client, table1);
+ writeData(client, table1);
+ writeData(client, table1);
+
+ TableId tid = Tables.getTableId(getCluster().getServerContext(), table1);
+
+ cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+ cluster.exec(CompactionCoordinator.class);
+
+ // Wait for the compaction to start by waiting for 1 external compaction column
+ List<TabletMetadata> md = new ArrayList<>();
+ TabletsMetadata tm = null;
+ do {
+ if (null != tm) {
+ tm.close();
+ }
+ tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+ .fetch(ColumnType.ECOMP).build();
+ tm.forEach(t -> md.add(t));
+ } while (md.size() == 0);
+ tm.close();
+
+ md.clear();
+ tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+ .fetch(ColumnType.PREV_ROW).build();
+ tm.forEach(t -> md.add(t));
+ Assert.assertEquals(2, md.size());
+ Text start = md.get(0).getPrevEndRow();
+ Text end = md.get(1).getEndRow();
+
+ // Merge - blocking operation
+ client.tableOperations().merge(table1, start, end);
+
+ // Confirm that there are no external compaction markers or final states in the metadata table
+ md.clear();
+ tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+ .fetch(ColumnType.ECOMP).build();
+ tm.forEach(t -> md.add(t));
+ Assert.assertEquals(0, md.size());
+ Assert.assertEquals(0,
+ getCluster().getServerContext().getAmple().getExternalCompactionFinalStates().count());
+
+ // Wait for the table to merge by waiting for only 1 tablet to show up in the metadata table
+ tm.close();
+
+ // Wait for the ExternalDoNothingCompactor to time out
+ UtilWaitThread.sleep(8000);
+
+ // Check that there is one failed compaction in the coordinator metrics
+ ExternalCompactionMetrics metrics = getCoordinatorMetrics();
+ Assert.assertTrue(metrics.getStarted() > 0);
+ Assert.assertTrue(metrics.getRunning() > 0); // CBUG: Should be zero when #2032 is resolved
+ Assert.assertEquals(0, metrics.getCompleted());
+ Assert.assertTrue(metrics.getFailed() > 0);
+
+ }
+
+ }
+
+ @Test
public void testManytablets() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
String table1 = "ectt4";
@@ -235,16 +372,10 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
UtilWaitThread.sleep(8000);
// The metadata tablets will be deleted from the metadata table because we have deleted the
- // table
- // Verify that the compaction failed by looking at the metrics in the Coordinator.
- HttpRequest req =
- HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build();
- HttpClient hc = HttpClient.newHttpClient();
- HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
- ExternalCompactionMetrics metrics =
- new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+ // table. Verify that the compaction failed by looking at the metrics in the Coordinator.
+ ExternalCompactionMetrics metrics = getCoordinatorMetrics();
Assert.assertEquals(1, metrics.getStarted());
- Assert.assertEquals(0, metrics.getRunning());
+ Assert.assertEquals(1, metrics.getRunning()); // CBUG: Should be zero when #2032 is resolved
Assert.assertEquals(0, metrics.getCompleted());
Assert.assertEquals(1, metrics.getFailed());
}
@@ -257,6 +388,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
String table1 = "ectt5";
createTable(client, table1, "cs1");
// set compaction ratio to 1 so that majc occurs naturally, not user compaction
+ // user compaction blocks delete
client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0");
// cause multiple rfiles to be created
writeData(client, table1);
@@ -289,16 +421,10 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
UtilWaitThread.sleep(8000);
// The metadata tablets will be deleted from the metadata table because we have deleted the
- // table
- // Verify that the compaction failed by looking at the metrics in the Coordinator.
- HttpRequest req =
- HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build();
- HttpClient hc = HttpClient.newHttpClient();
- HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
- ExternalCompactionMetrics metrics =
- new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+ // table. Verify that the compaction failed by looking at the metrics in the Coordinator.
+ ExternalCompactionMetrics metrics = getCoordinatorMetrics();
Assert.assertEquals(1, metrics.getStarted());
- Assert.assertEquals(0, metrics.getRunning());
+ Assert.assertEquals(1, metrics.getRunning()); // CBUG: Should be zero when #2032 is resolved
Assert.assertEquals(0, metrics.getCompleted());
Assert.assertEquals(1, metrics.getFailed());
}
@@ -391,6 +517,19 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
}
}
+ private ExternalCompactionMetrics getCoordinatorMetrics() throws Exception {
+ HttpRequest req =
+ HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build();
+ HttpClient hc =
+ HttpClient.newBuilder().version(Version.HTTP_1_1).followRedirects(Redirect.NORMAL).build();
+ HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
+ Assert.assertEquals(200, res.statusCode());
+ String metrics = res.body();
+ Assert.assertNotNull(metrics);
+ System.out.println("Metrics response: " + metrics);
+ return new Gson().fromJson(metrics, ExternalCompactionMetrics.class);
+ }
+
private void verify(AccumuloClient client, String table1, int modulus)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
try (Scanner scanner = client.createScanner(table1)) {