You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/12/09 21:05:11 UTC
accumulo git commit: ACCUMULO-3134 Enable file selection and output
configuration in compact command
Repository: accumulo
Updated Branches:
refs/heads/master 04774b171 -> 524a81392
ACCUMULO-3134 Enable file selection and output configuration in compact command
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/524a8139
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/524a8139
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/524a8139
Branch: refs/heads/master
Commit: 524a813925d69892c21d5db401e76350c00804f9
Parents: 04774b1
Author: keith@deenlo.com <ke...@deenlo.com>
Authored: Tue Dec 9 14:34:33 2014 -0500
Committer: keith@deenlo.com <ke...@deenlo.com>
Committed: Tue Dec 9 14:34:33 2014 -0500
----------------------------------------------------------------------
.../core/compaction/CompactionSettings.java | 86 +++++++++
.../ConfigurableCompactionStrategy.java | 179 +++++++++++++++++++
.../ConfigurableCompactionStrategyTest.java | 81 +++++++++
.../java/org/apache/accumulo/shell/Shell.java | 1 +
.../accumulo/shell/commands/CompactCommand.java | 78 +++++++-
.../org/apache/accumulo/test/ShellServerIT.java | 107 +++++++++++
6 files changed, 529 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
new file mode 100644
index 0000000..a45a692
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.compaction;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
+import com.google.common.base.Preconditions;
+
+interface Type {
+ String convert(String str);
+}
+
+class SizeType implements Type {
+ @Override
+ public String convert(String str) {
+ long size = AccumuloConfiguration.getMemoryInBytes(str);
+ Preconditions.checkArgument(size > 0);
+ return Long.toString(size);
+ }
+}
+
+class PatternType implements Type {
+ @Override
+ public String convert(String str) {
+ // ensure it compiles
+ Pattern.compile(str);
+ return str;
+ }
+}
+
+class UIntType implements Type {
+ @Override
+ public String convert(String str) {
+ Preconditions.checkArgument(Integer.parseInt(str) > 0);
+ return str;
+ }
+}
+
+class StringType implements Type {
+ @Override
+ public String convert(String str) {
+ return str;
+ }
+}
+
+public enum CompactionSettings {
+
+ SF_GT_ESIZE_OPT(new SizeType()),
+ SF_LT_ESIZE_OPT(new SizeType()),
+ SF_NAME_RE_OPT(new PatternType()),
+ SF_PATH_RE_OPT(new PatternType()),
+ MIN_FILES_OPT(new UIntType()),
+ OUTPUT_COMPRESSION_OPT(new StringType()),
+ OUTPUT_BLOCK_SIZE_OPT(new SizeType()),
+ OUTPUT_HDFS_BLOCK_SIZE_OPT(new SizeType()),
+ OUTPUT_INDEX_BLOCK_SIZE_OPT(new SizeType()),
+ OUTPUT_REPLICATION_OPT(new UIntType());
+
+ private Type type;
+
+ private CompactionSettings(Type type) {
+ this.type = type;
+ }
+
+ public void put(Map<String,String> options, String val) {
+ options.put(name(), type.convert(val));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
new file mode 100644
index 0000000..ba3ea42
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.tserver.compaction.strategies;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.compaction.CompactionSettings;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.compaction.CompactionPlan;
+import org.apache.accumulo.tserver.compaction.CompactionStrategy;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.compaction.WriteParameters;
+import org.apache.hadoop.fs.Path;
+
+public class ConfigurableCompactionStrategy extends CompactionStrategy {
+
+ private static interface Test {
+ boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request);
+ }
+
+ private static abstract class FileSizeTest implements Test {
+ private final long esize;
+
+ private FileSizeTest(String s) {
+ this.esize = Long.parseLong(s);
+ }
+
+ @Override
+ public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) {
+ return shouldCompact(file.getValue().getSize(), esize);
+ }
+
+ public abstract boolean shouldCompact(long fsize, long esize);
+ }
+
+ private static abstract class PatternPathTest implements Test {
+ private Pattern pattern;
+
+ private PatternPathTest(String p) {
+ this.pattern = Pattern.compile(p);
+ }
+
+ @Override
+ public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) {
+ return pattern.matcher(getInput(file.getKey().path())).matches();
+ }
+
+ public abstract String getInput(Path path);
+
+ }
+
+ private List<Test> tests = new ArrayList<>();
+ private boolean andTest = true;
+ private int minFiles = 1;
+ private WriteParameters writeParams = new WriteParameters();
+
+ public void init(Map<String,String> options) {
+
+ Set<Entry<String,String>> es = options.entrySet();
+ for (Entry<String,String> entry : es) {
+
+ switch (CompactionSettings.valueOf(entry.getKey())) {
+ case SF_LT_ESIZE_OPT:
+ tests.add(new FileSizeTest(entry.getValue()) {
+ @Override
+ public boolean shouldCompact(long fsize, long esize) {
+ return fsize < esize;
+ }
+ });
+ break;
+ case SF_GT_ESIZE_OPT:
+ tests.add(new FileSizeTest(entry.getValue()) {
+ @Override
+ public boolean shouldCompact(long fsize, long esize) {
+ return fsize > esize;
+ }
+ });
+ break;
+ case SF_NAME_RE_OPT:
+ tests.add(new PatternPathTest(entry.getValue()) {
+ @Override
+ public String getInput(Path path) {
+ return path.getName();
+ }
+ });
+ break;
+ case SF_PATH_RE_OPT:
+ tests.add(new PatternPathTest(entry.getValue()) {
+ @Override
+ public String getInput(Path path) {
+ return path.toString();
+ }
+ });
+ break;
+ case MIN_FILES_OPT:
+ minFiles = Integer.parseInt(entry.getValue());
+ break;
+ case OUTPUT_COMPRESSION_OPT:
+ writeParams.setCompressType(entry.getValue());
+ break;
+ case OUTPUT_BLOCK_SIZE_OPT:
+ writeParams.setBlockSize(Long.parseLong(entry.getValue()));
+ break;
+ case OUTPUT_INDEX_BLOCK_SIZE_OPT:
+ writeParams.setIndexBlockSize(Long.parseLong(entry.getValue()));
+ break;
+ case OUTPUT_HDFS_BLOCK_SIZE_OPT:
+ writeParams.setHdfsBlockSize(Long.parseLong(entry.getValue()));
+ break;
+ case OUTPUT_REPLICATION_OPT:
+ writeParams.setReplication(Integer.parseInt(entry.getValue()));
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown option " + entry.getKey());
+ }
+ }
+ }
+
+ private List<FileRef> getFilesToCompact(MajorCompactionRequest request) {
+ List<FileRef> filesToCompact = new ArrayList<>();
+
+ for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
+ boolean compact = false;
+ for (Test test : tests) {
+ if (andTest) {
+ compact = test.shouldCompact(entry, request);
+ if (!compact)
+ break;
+ } else {
+ compact |= test.shouldCompact(entry, request);
+ }
+ }
+
+ if (compact || tests.isEmpty())
+ filesToCompact.add(entry.getKey());
+ }
+ return filesToCompact;
+ }
+
+ @Override
+ public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+ return getFilesToCompact(request).size() >= minFiles;
+ }
+
+ @Override
+ public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+ List<FileRef> filesToCompact = getFilesToCompact(request);
+ if (filesToCompact.size() >= minFiles) {
+ CompactionPlan plan = new CompactionPlan();
+ plan.inputFiles.addAll(filesToCompact);
+ plan.writeParameters = writeParams;
+
+ return plan;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
new file mode 100644
index 0000000..a896537
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.tserver.compaction.strategies;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.compaction.CompactionSettings;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.compaction.CompactionPlan;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConfigurableCompactionStrategyTest {
+
+ // file selection options are adequately tested by ShellServerIT
+
+ @Test
+ public void testOutputOptions() throws Exception {
+ MajorCompactionRequest mcr = new MajorCompactionRequest(new KeyExtent(new Text("1"), null, null), MajorCompactionReason.USER, null, null);
+
+ Map<FileRef,DataFileValue> files = new HashMap<>();
+ files.put(new FileRef("hdfs://nn1/accumulo/tables/1/t-009/F00001.rf"), new DataFileValue(50000, 400));
+ mcr.setFiles(files);
+
+ // test setting no output options
+ ConfigurableCompactionStrategy ccs = new ConfigurableCompactionStrategy();
+
+ Map<String,String> opts = new HashMap<>();
+ ccs.init(opts);
+
+ CompactionPlan plan = ccs.getCompactionPlan(mcr);
+
+ Assert.assertEquals(0, plan.writeParameters.getBlockSize());
+ Assert.assertEquals(0, plan.writeParameters.getHdfsBlockSize());
+ Assert.assertEquals(0, plan.writeParameters.getIndexBlockSize());
+ Assert.assertEquals(0, plan.writeParameters.getReplication());
+ Assert.assertEquals(null, plan.writeParameters.getCompressType());
+
+ // test setting all output options
+ ccs = new ConfigurableCompactionStrategy();
+
+ CompactionSettings.OUTPUT_BLOCK_SIZE_OPT.put(opts, "64K");
+ CompactionSettings.OUTPUT_COMPRESSION_OPT.put(opts, "snappy");
+ CompactionSettings.OUTPUT_HDFS_BLOCK_SIZE_OPT.put(opts, "256M");
+ CompactionSettings.OUTPUT_INDEX_BLOCK_SIZE_OPT.put(opts, "32K");
+ CompactionSettings.OUTPUT_REPLICATION_OPT.put(opts, "5");
+
+ ccs.init(opts);
+
+ plan = ccs.getCompactionPlan(mcr);
+
+ Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("64K"), plan.writeParameters.getBlockSize());
+ Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("256M"), plan.writeParameters.getHdfsBlockSize());
+ Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("32K"), plan.writeParameters.getIndexBlockSize());
+ Assert.assertEquals(5, plan.writeParameters.getReplication());
+ Assert.assertEquals("snappy", plan.writeParameters.getCompressType());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/shell/src/main/java/org/apache/accumulo/shell/Shell.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index 8927ee0..8aadd68 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -1104,6 +1104,7 @@ public class Shell extends ShellOptions {
public static final void setDebugging(boolean debuggingEnabled) {
Logger.getLogger(Constants.CORE_PACKAGE_NAME).setLevel(debuggingEnabled ? Level.TRACE : Level.INFO);
+ Logger.getLogger(Shell.class.getPackage().getName()).setLevel(debuggingEnabled ? Level.TRACE : Level.INFO);
}
public static final boolean isDebuggingEnabled() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
index 660630e..9e599ae 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.compaction.CompactionSettings;
import org.apache.accumulo.shell.Shell;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -35,6 +36,10 @@ import org.apache.commons.cli.Options;
public class CompactCommand extends TableOperation {
private Option noFlushOption, waitOpt, profileOpt, cancelOpt, strategyOpt, strategyConfigOpt;
+ // file selection and file output options
+ private Option enameOption, epathOption, sizeLtOption, sizeGtOption, minFilesOption, outBlockSizeOpt, outHdfsBlockSizeOpt, outIndexBlockSizeOpt,
+ outCompressionOpt, outReplication;
+
private CompactionConfig compactionConfig = null;
boolean override = false;
@@ -43,7 +48,9 @@ public class CompactCommand extends TableOperation {
@Override
public String description() {
- return "sets all tablets for a table to major compact as soon as possible (based on current time)";
+ return "Initiates a major compaction on tablets within the specified range that have one or more files. If no file selection options are specified, then "
+ + "all files will be compacted. Options that configure output settings are only applied to this compaction and not later compactions. If multiple "
+ + "concurrent user initiated compactions specify iterators or a compaction strategy, then all but one will fail to start.";
}
protected void doTableOp(final Shell shellState, final String tableName) throws AccumuloException, AccumuloSecurityException {
@@ -70,7 +77,29 @@ public class CompactCommand extends TableOperation {
}
}
}
-
+
+ private void put(CommandLine cl, Map<String,String> opts, Option opt, CompactionSettings setting) {
+ if (cl.hasOption(opt.getLongOpt()))
+ setting.put(opts, cl.getOptionValue(opt.getLongOpt()));
+ }
+
+ private Map<String,String> getConfigurableCompactionStrategyOpts(CommandLine cl) {
+ Map<String,String> opts = new HashMap<>();
+
+ put(cl, opts, enameOption, CompactionSettings.SF_NAME_RE_OPT);
+ put(cl, opts, epathOption, CompactionSettings.SF_PATH_RE_OPT);
+ put(cl, opts, sizeLtOption, CompactionSettings.SF_LT_ESIZE_OPT);
+ put(cl, opts, sizeGtOption, CompactionSettings.SF_GT_ESIZE_OPT);
+ put(cl, opts, minFilesOption, CompactionSettings.MIN_FILES_OPT);
+ put(cl, opts, outCompressionOpt, CompactionSettings.OUTPUT_COMPRESSION_OPT);
+ put(cl, opts, outBlockSizeOpt, CompactionSettings.OUTPUT_BLOCK_SIZE_OPT);
+ put(cl, opts, outHdfsBlockSizeOpt, CompactionSettings.OUTPUT_HDFS_BLOCK_SIZE_OPT);
+ put(cl, opts, outIndexBlockSizeOpt, CompactionSettings.OUTPUT_INDEX_BLOCK_SIZE_OPT);
+ put(cl, opts, outReplication, CompactionSettings.OUTPUT_REPLICATION_OPT);
+
+ return opts;
+ }
+
@Override
public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception {
@@ -101,7 +130,12 @@ public class CompactCommand extends TableOperation {
compactionConfig.setIterators(new ArrayList<>(iterators));
}
+ Map<String,String> configurableCompactOpt = getConfigurableCompactionStrategyOpts(cl);
+
if (cl.hasOption(strategyOpt.getOpt())) {
+ if (configurableCompactOpt.size() > 0)
+ throw new IllegalArgumentException("Can not specify compaction strategy with file selection and file output options.");
+
CompactionStrategyConfig csc = new CompactionStrategyConfig(cl.getOptionValue(strategyOpt.getOpt()));
if (cl.hasOption(strategyConfigOpt.getOpt())) {
Map<String,String> props = new HashMap<>();
@@ -117,9 +151,19 @@ public class CompactCommand extends TableOperation {
compactionConfig.setCompactionStrategy(csc);
}
+ if (configurableCompactOpt.size() > 0) {
+ CompactionStrategyConfig csc = new CompactionStrategyConfig("org.apache.accumulo.tserver.compaction.strategies.ConfigurableCompactionStrategy");
+ csc.setOptions(configurableCompactOpt);
+ compactionConfig.setCompactionStrategy(csc);
+ }
+
return super.execute(fullCommand, cl, shellState);
}
+ private Option newLAO(String lopt, String desc) {
+ return new Option(null, lopt, true, desc);
+ }
+
@Override
public Options getOptions() {
final Options opts = super.getOptions();
@@ -131,7 +175,7 @@ public class CompactCommand extends TableOperation {
waitOpt = new Option("w", "wait", false, "wait for compact to finish");
opts.addOption(waitOpt);
- profileOpt = new Option("pn", "profile", true, "iterator profile name");
+ profileOpt = new Option("pn", "profile", true, "Iterator profile name.");
profileOpt.setArgName("profile");
opts.addOption(profileOpt);
@@ -143,6 +187,34 @@ public class CompactCommand extends TableOperation {
cancelOpt = new Option(null, "cancel", false, "cancel user initiated compactions");
opts.addOption(cancelOpt);
+ enameOption = newLAO("sf-ename", "Select files using regular expression to match file names. Only matches against last part of path.");
+ opts.addOption(enameOption);
+ epathOption = newLAO("sf-epath", "Select files using regular expression to match file paths to compact. Matches against full path.");
+ opts.addOption(epathOption);
+ sizeLtOption = newLAO("sf-lt-esize",
+ "Selects files less than specified size. Uses the estimated size of file in metadata table. Can use K,M, and G suffixes");
+ opts.addOption(sizeLtOption);
+ sizeGtOption = newLAO("sf-gt-esize",
+ "Selects files greater than specified size. Uses the estimated size of file in metadata table. Can use K,M, and G suffixes");
+ opts.addOption(sizeGtOption);
+ minFilesOption = newLAO("min-files",
+ "Only compacts if at least the specified number of files are selected. When no file selection criteria are given, all files are selected.");
+ opts.addOption(minFilesOption);
+ outBlockSizeOpt = newLAO("out-data-bs",
+ "Rfile data block size to use for compaction output file. Can use K,M, and G suffixes. Uses table settings if not specified.");
+ opts.addOption(outBlockSizeOpt);
+ outHdfsBlockSizeOpt = newLAO("out-hdfs-bs",
+ "HDFS block size to use for compaction output file. Can use K,M, and G suffixes. Uses table settings if not specified.");
+ opts.addOption(outHdfsBlockSizeOpt);
+ outIndexBlockSizeOpt = newLAO("out-index-bs",
+ "Rfile index block size to use for compaction output file. Can use K,M, and G suffixes. Uses table settings if not specified.");
+ opts.addOption(outIndexBlockSizeOpt);
+ outCompressionOpt = newLAO("out-compress",
+ "Compression to use for compaction output file. Either snappy, gz, lzo, or none. Uses table settings if not specified.");
+ opts.addOption(outCompressionOpt);
+ outReplication = newLAO("out-replication", "HDFS replication to use for compaction output file. Uses table settings if not specified.");
+ opts.addOption(outReplication);
+
return opts;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index d878c7f..e4104ce 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import jline.console.ConsoleReader;
@@ -753,6 +754,112 @@ public class ShellServerIT extends SharedMiniClusterIT {
}
@Test
+ public void testCompactionSelection() throws Exception {
+ final String table = name.getMethodName();
+ final String clone = table + "_clone";
+
+ ts.exec("createtable " + table);
+ ts.exec("insert a b c d");
+ ts.exec("flush -w");
+ ts.exec("insert x y z v");
+ ts.exec("flush -w");
+
+ ts.exec("clonetable -s " + Property.TABLE_MAJC_RATIO.getKey() + "=10 " + table + " " + clone);
+
+ ts.exec("table " + clone);
+ ts.exec("insert m n l o");
+ ts.exec("flush -w");
+
+ String tableId = getTableId(table);
+ String cloneId = getTableId(clone);
+
+ assertEquals(3, countFiles(cloneId));
+
+ // compact only files from src table
+ ts.exec("compact -t " + clone + " -w --sf-epath .*tables/" + tableId + ".*");
+
+ assertEquals(2, countFiles(cloneId));
+
+ ts.exec("insert r s t u");
+ ts.exec("flush -w");
+
+ assertEquals(3, countFiles(cloneId));
+
+ // compact all flush files
+ ts.exec("compact -t " + clone + " -w --sf-ename F.*");
+
+ assertEquals(2, countFiles(cloneId));
+
+ // create two large files
+ Random rand = new Random();
+ StringBuilder sb = new StringBuilder("insert b v q ");
+ for (int i = 0; i < 10000; i++) {
+ sb.append('a' + rand.nextInt(26));
+ }
+
+ ts.exec(sb.toString());
+ ts.exec("flush -w");
+
+ ts.exec(sb.toString());
+ ts.exec("flush -w");
+
+ assertEquals(4, countFiles(cloneId));
+
+ // compact only small files
+ ts.exec("compact -t " + clone + " -w --sf-lt-esize 1000");
+
+ assertEquals(3, countFiles(cloneId));
+
+ // compact large files if 3 or more
+ ts.exec("compact -t " + clone + " -w --sf-gt-esize 1K --min-files 3");
+
+ assertEquals(3, countFiles(cloneId));
+
+ // compact large files if 2 or more
+ ts.exec("compact -t " + clone + " -w --sf-gt-esize 1K --min-files 2");
+
+ assertEquals(2, countFiles(cloneId));
+
+ // compact if tablet has 3 or more files
+ ts.exec("compact -t " + clone + " -w --min-files 3");
+
+ assertEquals(2, countFiles(cloneId));
+
+ // compact if tablet has 2 or more files
+ ts.exec("compact -t " + clone + " -w --min-files 2");
+
+ assertEquals(1, countFiles(cloneId));
+
+ // create two small and one large flush files in order to test AND
+ ts.exec(sb.toString());
+ ts.exec("flush -w");
+
+ ts.exec("insert m n l o");
+ ts.exec("flush -w");
+
+ ts.exec("insert m n l o");
+ ts.exec("flush -w");
+
+ assertEquals(4, countFiles(cloneId));
+
+ // should only compact two small flush files leaving large flush file
+ ts.exec("compact -t " + clone + " -w --sf-ename F.* --sf-lt-esize 1K");
+
+ assertEquals(3, countFiles(cloneId));
+ }
+
+ @Test
+ public void testCompactionSelectionAndStrategy() throws Exception {
+
+ final String table = name.getMethodName();
+
+ ts.exec("createtable " + table);
+
+ // expect this to fail
+ ts.exec("compact -t " + table + " -w --sf-ename F.* -s " + TestCompactionStrategy.class.getName() + " -sc inputPrefix=F,dropPrefix=A", false);
+ }
+
+ @Test
public void constraint() throws Exception {
final String table = name.getMethodName();