You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/04/12 23:36:22 UTC
[accumulo] branch 1451-external-compactions-feature updated: Add
external compaction IT
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new b44069b Add external compaction IT
b44069b is described below
commit b44069bfb99d9fc76b65d11c839fd377f7bb9441
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon Apr 12 17:45:51 2021 -0400
Add external compaction IT
---
.../server/compaction/ExternalCompactionUtil.java | 4 -
.../accumulo/compactor/CompactionEnvironment.java | 26 ++++-
.../org/apache/accumulo/compactor/Compactor.java | 3 +-
test/pom.xml | 8 ++
.../apache/accumulo/test/ExternalCompactionIT.java | 126 +++++++++++++++++++++
5 files changed, 159 insertions(+), 8 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index 38c65a9..26ff813 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -65,8 +65,6 @@ public class ExternalCompactionUtil {
/**
*
- * @param context
- * server context
* @return null if Coordinator node not found, else HostAndPort
*/
public static HostAndPort findCompactionCoordinator(ServerContext context) {
@@ -85,8 +83,6 @@ public class ExternalCompactionUtil {
}
/**
- * @param context
- * server context
* @return list of Compactors
*/
public static List<HostAndPort> getCompactorAddrs(ServerContext context) {
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
index fad132b..b37a737 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
@@ -36,16 +36,36 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+import com.google.common.annotations.VisibleForTesting;
+
public class CompactionEnvironment implements Closeable, CompactionEnv {
private final ServerContext context;
private final CompactionJobHolder jobHolder;
private final SharedRateLimiterFactory limiter;
+ private String queueName;
+
+ public static class CompactorIterEnv extends TabletIteratorEnvironment {
+
+ private String queueName;
+
+ public CompactorIterEnv(ServerContext context, IteratorScope scope, boolean fullMajC,
+ AccumuloConfiguration tableConfig, TableId tableId, CompactionKind kind, String queueName) {
+ super(context, scope, fullMajC, tableConfig, tableId, kind);
+ this.queueName = queueName;
+ }
+
+ @VisibleForTesting
+ public String getQueueName() {
+ return queueName;
+ }
+ }
- CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) {
+ CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder, String queueName) {
this.context = context;
this.jobHolder = jobHolder;
this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration());
+ this.queueName = queueName;
}
@Override
@@ -77,9 +97,9 @@ public class CompactionEnvironment implements Closeable, CompactionEnv {
@Override
public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
AccumuloConfiguration acuTableConf, TableId tableId) {
- return new TabletIteratorEnvironment(context, IteratorScope.majc,
+ return new CompactorIterEnv(context, IteratorScope.majc,
!jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
- CompactionKind.valueOf(jobHolder.getJob().getKind().name()));
+ CompactionKind.valueOf(jobHolder.getJob().getKind().name()), queueName);
}
@Override
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 130cda3..0190820 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -500,7 +500,8 @@ public class Compactor extends AbstractServer
job.getIteratorSettings().getIterators()
.forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
- try (CompactionEnvironment cenv = new CompactionEnvironment(getContext(), JOB_HOLDER)) {
+ try (CompactionEnvironment cenv =
+ new CompactionEnvironment(getContext(), JOB_HOLDER, queueName)) {
org.apache.accumulo.server.compaction.Compactor compactor =
new org.apache.accumulo.server.compaction.Compactor(getContext(),
KeyExtent.fromThrift(job.getExtent()), files, outputFile,
diff --git a/test/pom.xml b/test/pom.xml
index 4f24f05..921425e 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -68,6 +68,14 @@
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-compaction-coordinator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-compactor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
<dependency>
diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
new file mode 100644
index 0000000..1b47f58
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv;
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+public class ExternalCompactionIT extends ConfigurableMacBase {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty("tserver.compaction.major.service.cs1.planner",
+ DefaultCompactionPlanner.class.getName());
+ cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors",
+ "[{'name':'all','externalQueue':'DCQ1'}]");
+ }
+
+ public static class TestFilter extends Filter {
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+
+ CompactorIterEnv cienv = (CompactorIterEnv) env;
+
+ Preconditions.checkArgument(!cienv.getQueueName().isEmpty());
+ Preconditions
+ .checkArgument(options.getOrDefault("expectedQ", "").equals(cienv.getQueueName()));
+ }
+
+ @Override
+ public boolean accept(Key k, Value v) {
+ return Integer.parseInt(v.toString()) % 2 == 0;
+ }
+
+ }
+
+ @Test
+ public void testExternalCompaction() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+ Map<String,String> props =
+ Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
+ "table.compaction.dispatcher.opts.service", "cs1");
+ NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props);
+
+ String tableName = "ectt";
+
+ client.tableOperations().create(tableName, ntc);
+
+ try (BatchWriter bw = client.createBatchWriter(tableName)) {
+ for (int i = 0; i < 10; i++) {
+ Mutation m = new Mutation("r:" + i);
+ m.put("", "", "" + i);
+ bw.addMutation(m);
+ }
+ }
+
+ client.tableOperations().flush(tableName);
+
+ cluster.exec(Compactor.class, "-q", "DCQ1");
+ cluster.exec(CompactionCoordinator.class);
+
+ IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
+ // make sure iterator options make it to compactor process
+ iterSetting.addOption("expectedQ", "DCQ1");
+ CompactionConfig config =
+ new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true);
+ client.tableOperations().compact(tableName, config);
+
+ try (Scanner scanner = client.createScanner(tableName)) {
+ int count = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ Assert.assertTrue(Integer.parseInt(entry.getValue().toString()) % 2 == 0);
+ count++;
+ }
+
+ Assert.assertEquals(5, count);
+ }
+ }
+ }
+}