You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/04/27 15:34:36 UTC

[accumulo] branch 1451-external-compactions-feature updated: ref #2011 - added ITs for splits and merges during external compaction

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new 209ac74  ref #2011 - added ITs for splits and merges during external compaction
209ac74 is described below

commit 209ac74f796cb8641c4733fe477c4947fd076816
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Apr 27 15:33:47 2021 +0000

    ref #2011 - added ITs for splits and merges during external compaction
---
 .../coordinator/CompactionCoordinator.java         |   7 +-
 .../coordinator/ExternalCompactionMetrics.java     |  12 +-
 .../accumulo/coordinator/QueueSummaries.java       |   2 +-
 .../apache/accumulo/test/ExternalCompactionIT.java | 175 ++++++++++++++++++---
 4 files changed, 165 insertions(+), 31 deletions(-)

diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index f8c9d65..1107e4d 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -258,7 +258,8 @@ public class CompactionCoordinator extends AbstractServer
         response.setStatus(200);
         response.setContentType("application/json");
         metrics.setRunning(RUNNING.size());
-        response.getWriter().print(metrics.toJson(GSON));
+        LOG.debug("Returning metrics: {}", metrics);
+        response.getWriter().print(GSON.toJson(metrics));
       }
     });
     handlers.addHandler(metricContext);
@@ -700,6 +701,7 @@ public class CompactionCoordinator extends AbstractServer
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
     LOG.info("Compaction completed, id: {}, stats: {}", externalCompactionId, stats);
+    metrics.incrementCompleted();
     final var ecid = ExternalCompactionId.of(externalCompactionId);
     final RunningCompaction rc = RUNNING.get(ecid);
     if (null != rc) {
@@ -708,7 +710,6 @@ public class CompactionCoordinator extends AbstractServer
       rc.setCompleted();
       compactionFinalizer.commitCompaction(ecid, KeyExtent.fromThrift(textent), stats.fileSize,
           stats.entriesWritten);
-      metrics.incrementCompleted();
     } else {
       LOG.error(
           "Compaction completed called by Compactor for {}, but no running compaction for that id.",
@@ -726,13 +727,13 @@ public class CompactionCoordinator extends AbstractServer
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
     LOG.info("Compaction failed, id: {}", externalCompactionId);
+    metrics.incrementFailed();
     final var ecid = ExternalCompactionId.of(externalCompactionId);
     final RunningCompaction rc = RUNNING.get(ecid);
     if (null != rc) {
       // CBUG: Should we remove rc from RUNNING here and remove the isCompactionCompleted method?
       rc.setCompleted();
       compactionFinalizer.failCompactions(Map.of(ecid, KeyExtent.fromThrift(extent)));
-      metrics.incrementFailed();
     } else {
       LOG.error(
           "Compaction failed called by Compactor for {}, but no running compaction for that id.",
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
index 2ad35f4..5c72cdc 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
@@ -18,8 +18,6 @@
  */
 package org.apache.accumulo.coordinator;
 
-import com.google.gson.Gson;
-
 public class ExternalCompactionMetrics {
 
   private long started = 0;
@@ -71,17 +69,13 @@ public class ExternalCompactionMetrics {
     this.failed++;
   }
 
-  public String toJson(Gson gson) {
-    return gson.toJson(this);
-  }
-
   @Override
   public String toString() {
     StringBuilder buf = new StringBuilder();
     buf.append("started: ").append(started);
-    buf.append("running: ").append(running);
-    buf.append("completed: ").append(completed);
-    buf.append("failed: ").append(failed);
+    buf.append(", running: ").append(running);
+    buf.append(", completed: ").append(completed);
+    buf.append(", failed: ").append(failed);
     return buf.toString();
   }
 
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
index 8cfe294..103ffc4 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
@@ -37,7 +37,7 @@ import com.google.common.collect.Sets;
 
 public class QueueSummaries {
 
-  // keep track of the last tserver retunred for qeueue
+  // keep track of the last tserver returned for queue
   final Map<String,PrioTserver> LAST = new HashMap<>();
 
   /* Map of external queue name -> priority -> tservers */
diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index a2b7a3c..4e7faa8 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -23,6 +23,8 @@ import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
 import java.io.IOException;
 import java.net.URI;
 import java.net.http.HttpClient;
+import java.net.http.HttpClient.Redirect;
+import java.net.http.HttpClient.Version;
 import java.net.http.HttpRequest;
 import java.net.http.HttpResponse;
 import java.net.http.HttpResponse.BodyHandlers;
@@ -50,9 +52,11 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.clientImpl.Tables;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.Filter;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -173,6 +177,139 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
   }
 
   @Test
+  public void testSplitDuringExternalCompaction() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+      String table1 = "ectt6";
+      createTable(client, table1, "cs1");
+      TableId tid = Tables.getTableId(getCluster().getServerContext(), table1);
+      writeData(client, table1);
+
+      cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+      cluster.exec(CompactionCoordinator.class);
+      compact(client, table1, 2, "DCQ1", false);
+
+      // Wait for the compaction to start by waiting for 1 external compaction column
+      List<TabletMetadata> md = new ArrayList<>();
+      TabletsMetadata tm = null;
+      do {
+        if (null != tm) {
+          tm.close();
+        }
+        tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+            .fetch(ColumnType.ECOMP).build();
+        tm.forEach(t -> md.add(t));
+      } while (md.size() == 0);
+      tm.close();
+      md.clear();
+
+      // ExternalDoNothingCompactor will not compact, it will wait, split the table.
+      SortedSet<Text> splits = new TreeSet<>();
+      int jump = MAX_DATA / 5;
+      for (int r = jump; r < MAX_DATA; r += jump) {
+        splits.add(new Text(row(r)));
+      }
+      client.tableOperations().addSplits(table1, splits);
+
+      // Wait for the compaction to get cancelled
+      UtilWaitThread.sleep(5000);
+
+      // Wait for the table to split by waiting for 5 tablets to show up in the metadata table
+      do {
+        if (null != tm) {
+          tm.close();
+        }
+        tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+            .fetch(ColumnType.PREV_ROW).build();
+        tm.forEach(t -> md.add(t));
+      } while (md.size() < 5);
+      tm.close();
+
+      // Check that there is one failed compaction in the coordinator metrics
+      ExternalCompactionMetrics metrics = getCoordinatorMetrics();
+      Assert.assertEquals(1, metrics.getStarted());
+      Assert.assertEquals(1, metrics.getRunning()); // CBUG: Should be zero when #2032 is resolved
+      Assert.assertEquals(0, metrics.getCompleted());
+      Assert.assertEquals(1, metrics.getFailed());
+
+    }
+
+  }
+
+  @Test
+  public void testMergeDuringExternalCompaction() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+      String table1 = "ectt7";
+      SortedSet<Text> splits = new TreeSet<>();
+      int jump = MAX_DATA / 2;
+      for (int r = jump; r < MAX_DATA; r += jump) {
+        splits.add(new Text(row(r)));
+      }
+      createTable(client, table1, "cs1", splits);
+      // set compaction ratio to 1 so that majc occurs naturally, not user compaction
+      // user compaction blocks merge
+      client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0");
+      // cause multiple rfiles to be created
+      writeData(client, table1);
+      writeData(client, table1);
+      writeData(client, table1);
+      writeData(client, table1);
+
+      TableId tid = Tables.getTableId(getCluster().getServerContext(), table1);
+
+      cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+      cluster.exec(CompactionCoordinator.class);
+
+      // Wait for the compaction to start by waiting for 1 external compaction column
+      List<TabletMetadata> md = new ArrayList<>();
+      TabletsMetadata tm = null;
+      do {
+        if (null != tm) {
+          tm.close();
+        }
+        tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+            .fetch(ColumnType.ECOMP).build();
+        tm.forEach(t -> md.add(t));
+      } while (md.size() == 0);
+      tm.close();
+
+      md.clear();
+      tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+          .fetch(ColumnType.PREV_ROW).build();
+      tm.forEach(t -> md.add(t));
+      Assert.assertEquals(2, md.size());
+      Text start = md.get(0).getPrevEndRow();
+      Text end = md.get(1).getEndRow();
+
+      // Merge - blocking operation
+      client.tableOperations().merge(table1, start, end);
+
+      // Confirm that there are no external compaction markers or final states in the metadata table
+      md.clear();
+      tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid)
+          .fetch(ColumnType.ECOMP).build();
+      tm.forEach(t -> md.add(t));
+      Assert.assertEquals(0, md.size());
+      Assert.assertEquals(0,
+          getCluster().getServerContext().getAmple().getExternalCompactionFinalStates().count());
+
+      // Wait for the table to merge by waiting for only 1 tablet to show up in the metadata table
+      tm.close();
+
+      // Wait for the ExternalDoNothingCompactor to time out
+      UtilWaitThread.sleep(8000);
+
+      // Check that there is one failed compaction in the coordinator metrics
+      ExternalCompactionMetrics metrics = getCoordinatorMetrics();
+      Assert.assertTrue(metrics.getStarted() > 0);
+      Assert.assertTrue(metrics.getRunning() > 0); // CBUG: Should be zero when #2032 is resolved
+      Assert.assertEquals(0, metrics.getCompleted());
+      Assert.assertTrue(metrics.getFailed() > 0);
+
+    }
+
+  }
+
+  @Test
   public void testManytablets() throws Exception {
     try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
       String table1 = "ectt4";
@@ -235,16 +372,10 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
       UtilWaitThread.sleep(8000);
 
       // The metadata tablets will be deleted from the metadata table because we have deleted the
-      // table
-      // Verify that the compaction failed by looking at the metrics in the Coordinator.
-      HttpRequest req =
-          HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build();
-      HttpClient hc = HttpClient.newHttpClient();
-      HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
-      ExternalCompactionMetrics metrics =
-          new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+      // table. Verify that the compaction failed by looking at the metrics in the Coordinator.
+      ExternalCompactionMetrics metrics = getCoordinatorMetrics();
       Assert.assertEquals(1, metrics.getStarted());
-      Assert.assertEquals(0, metrics.getRunning());
+      Assert.assertEquals(1, metrics.getRunning()); // CBUG: Should be zero when #2032 is resolved
       Assert.assertEquals(0, metrics.getCompleted());
       Assert.assertEquals(1, metrics.getFailed());
     }
@@ -257,6 +388,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
       String table1 = "ectt5";
       createTable(client, table1, "cs1");
       // set compaction ratio to 1 so that majc occurs naturally, not user compaction
+      // user compaction blocks delete
       client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0");
       // cause multiple rfiles to be created
       writeData(client, table1);
@@ -289,16 +421,10 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
       UtilWaitThread.sleep(8000);
 
       // The metadata tablets will be deleted from the metadata table because we have deleted the
-      // table
-      // Verify that the compaction failed by looking at the metrics in the Coordinator.
-      HttpRequest req =
-          HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build();
-      HttpClient hc = HttpClient.newHttpClient();
-      HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
-      ExternalCompactionMetrics metrics =
-          new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+      // table. Verify that the compaction failed by looking at the metrics in the Coordinator.
+      ExternalCompactionMetrics metrics = getCoordinatorMetrics();
       Assert.assertEquals(1, metrics.getStarted());
-      Assert.assertEquals(0, metrics.getRunning());
+      Assert.assertEquals(1, metrics.getRunning()); // CBUG: Should be zero when #2032 is resolved
       Assert.assertEquals(0, metrics.getCompleted());
       Assert.assertEquals(1, metrics.getFailed());
     }
@@ -391,6 +517,19 @@ public class ExternalCompactionIT extends ConfigurableMacBase {
     }
   }
 
+  private ExternalCompactionMetrics getCoordinatorMetrics() throws Exception {
+    HttpRequest req =
+        HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build();
+    HttpClient hc =
+        HttpClient.newBuilder().version(Version.HTTP_1_1).followRedirects(Redirect.NORMAL).build();
+    HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
+    Assert.assertEquals(200, res.statusCode());
+    String metrics = res.body();
+    Assert.assertNotNull(metrics);
+    System.out.println("Metrics response: " + metrics);
+    return new Gson().fromJson(metrics, ExternalCompactionMetrics.class);
+  }
+
   private void verify(AccumuloClient client, String table1, int modulus)
       throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
     try (Scanner scanner = client.createScanner(table1)) {