You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/04/12 23:36:22 UTC

[accumulo] branch 1451-external-compactions-feature updated: Add external compaction IT

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new b44069b  Add external compaction IT
b44069b is described below

commit b44069bfb99d9fc76b65d11c839fd377f7bb9441
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon Apr 12 17:45:51 2021 -0400

    Add external compaction IT
---
 .../server/compaction/ExternalCompactionUtil.java  |   4 -
 .../accumulo/compactor/CompactionEnvironment.java  |  26 ++++-
 .../org/apache/accumulo/compactor/Compactor.java   |   3 +-
 test/pom.xml                                       |   8 ++
 .../apache/accumulo/test/ExternalCompactionIT.java | 126 +++++++++++++++++++++
 5 files changed, 159 insertions(+), 8 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index 38c65a9..26ff813 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -65,8 +65,6 @@ public class ExternalCompactionUtil {
 
   /**
    *
-   * @param context
-   *          server context
    * @return null if Coordinator node not found, else HostAndPort
    */
   public static HostAndPort findCompactionCoordinator(ServerContext context) {
@@ -85,8 +83,6 @@ public class ExternalCompactionUtil {
   }
 
   /**
-   * @param context
-   *          server context
    * @return list of Compactors
    */
   public static List<HostAndPort> getCompactorAddrs(ServerContext context) {
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
index fad132b..b37a737 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
@@ -36,16 +36,36 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
 import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class CompactionEnvironment implements Closeable, CompactionEnv {
 
   private final ServerContext context;
   private final CompactionJobHolder jobHolder;
   private final SharedRateLimiterFactory limiter;
+  private String queueName;
+
+  public static class CompactorIterEnv extends TabletIteratorEnvironment {
+
+    private String queueName;
+
+    public CompactorIterEnv(ServerContext context, IteratorScope scope, boolean fullMajC,
+        AccumuloConfiguration tableConfig, TableId tableId, CompactionKind kind, String queueName) {
+      super(context, scope, fullMajC, tableConfig, tableId, kind);
+      this.queueName = queueName;
+    }
+
+    @VisibleForTesting
+    public String getQueueName() {
+      return queueName;
+    }
+  }
 
-  CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) {
+  CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder, String queueName) {
     this.context = context;
     this.jobHolder = jobHolder;
     this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration());
+    this.queueName = queueName;
   }
 
   @Override
@@ -77,9 +97,9 @@ public class CompactionEnvironment implements Closeable, CompactionEnv {
   @Override
   public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
       AccumuloConfiguration acuTableConf, TableId tableId) {
-    return new TabletIteratorEnvironment(context, IteratorScope.majc,
+    return new CompactorIterEnv(context, IteratorScope.majc,
         !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
-        CompactionKind.valueOf(jobHolder.getJob().getKind().name()));
+        CompactionKind.valueOf(jobHolder.getJob().getKind().name()), queueName);
   }
 
   @Override
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 130cda3..0190820 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -500,7 +500,8 @@ public class Compactor extends AbstractServer
           job.getIteratorSettings().getIterators()
               .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
 
-          try (CompactionEnvironment cenv = new CompactionEnvironment(getContext(), JOB_HOLDER)) {
+          try (CompactionEnvironment cenv =
+              new CompactionEnvironment(getContext(), JOB_HOLDER, queueName)) {
             org.apache.accumulo.server.compaction.Compactor compactor =
                 new org.apache.accumulo.server.compaction.Compactor(getContext(),
                     KeyExtent.fromThrift(job.getExtent()), files, outputFile,
diff --git a/test/pom.xml b/test/pom.xml
index 4f24f05..921425e 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -68,6 +68,14 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-compaction-coordinator</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-compactor</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
     </dependency>
     <dependency>
diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
new file mode 100644
index 0000000..1b47f58
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv;
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+public class ExternalCompactionIT extends ConfigurableMacBase {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty("tserver.compaction.major.service.cs1.planner",
+        DefaultCompactionPlanner.class.getName());
+    cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors",
+        "[{'name':'all','externalQueue':'DCQ1'}]");
+  }
+
+  public static class TestFilter extends Filter {
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      super.init(source, options, env);
+
+      CompactorIterEnv cienv = (CompactorIterEnv) env;
+
+      Preconditions.checkArgument(!cienv.getQueueName().isEmpty());
+      Preconditions
+          .checkArgument(options.getOrDefault("expectedQ", "").equals(cienv.getQueueName()));
+    }
+
+    @Override
+    public boolean accept(Key k, Value v) {
+      return Integer.parseInt(v.toString()) % 2 == 0;
+    }
+
+  }
+
+  @Test
+  public void testExternalCompaction() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+      Map<String,String> props =
+          Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
+              "table.compaction.dispatcher.opts.service", "cs1");
+      NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props);
+
+      String tableName = "ectt";
+
+      client.tableOperations().create(tableName, ntc);
+
+      try (BatchWriter bw = client.createBatchWriter(tableName)) {
+        for (int i = 0; i < 10; i++) {
+          Mutation m = new Mutation("r:" + i);
+          m.put("", "", "" + i);
+          bw.addMutation(m);
+        }
+      }
+
+      client.tableOperations().flush(tableName);
+
+      cluster.exec(Compactor.class, "-q", "DCQ1");
+      cluster.exec(CompactionCoordinator.class);
+
+      IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
+      // make sure iterator options make it to compactor process
+      iterSetting.addOption("expectedQ", "DCQ1");
+      CompactionConfig config =
+          new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true);
+      client.tableOperations().compact(tableName, config);
+
+      try (Scanner scanner = client.createScanner(tableName)) {
+        int count = 0;
+        for (Entry<Key,Value> entry : scanner) {
+          Assert.assertTrue(Integer.parseInt(entry.getValue().toString()) % 2 == 0);
+          count++;
+        }
+
+        Assert.assertEquals(5, count);
+      }
+    }
+  }
+}