You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/12/09 21:05:11 UTC

accumulo git commit: ACCUMULO-3134 Enable file selection and output configuration in compact command

Repository: accumulo
Updated Branches:
  refs/heads/master 04774b171 -> 524a81392


ACCUMULO-3134 Enable file selection and output configuration in compact command


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/524a8139
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/524a8139
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/524a8139

Branch: refs/heads/master
Commit: 524a813925d69892c21d5db401e76350c00804f9
Parents: 04774b1
Author: keith@deenlo.com <ke...@deenlo.com>
Authored: Tue Dec 9 14:34:33 2014 -0500
Committer: keith@deenlo.com <ke...@deenlo.com>
Committed: Tue Dec 9 14:34:33 2014 -0500

----------------------------------------------------------------------
 .../core/compaction/CompactionSettings.java     |  86 +++++++++
 .../ConfigurableCompactionStrategy.java         | 179 +++++++++++++++++++
 .../ConfigurableCompactionStrategyTest.java     |  81 +++++++++
 .../java/org/apache/accumulo/shell/Shell.java   |   1 +
 .../accumulo/shell/commands/CompactCommand.java |  78 +++++++-
 .../org/apache/accumulo/test/ShellServerIT.java | 107 +++++++++++
 6 files changed, 529 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
new file mode 100644
index 0000000..a45a692
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.compaction;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
+import com.google.common.base.Preconditions;
+
+interface Type {
+  String convert(String str);
+}
+
+class SizeType implements Type {
+  @Override
+  public String convert(String str) {
+    long size = AccumuloConfiguration.getMemoryInBytes(str);
+    Preconditions.checkArgument(size > 0);
+    return Long.toString(size);
+  }
+}
+
+class PatternType implements Type {
+  @Override
+  public String convert(String str) {
+    // ensure it compiles
+    Pattern.compile(str);
+    return str;
+  }
+}
+
+class UIntType implements Type {
+  @Override
+  public String convert(String str) {
+    Preconditions.checkArgument(Integer.parseInt(str) > 0);
+    return str;
+  }
+}
+
+class StringType implements Type {
+  @Override
+  public String convert(String str) {
+    return str;
+  }
+}
+
+public enum CompactionSettings {
+
+  SF_GT_ESIZE_OPT(new SizeType()),
+  SF_LT_ESIZE_OPT(new SizeType()),
+  SF_NAME_RE_OPT(new PatternType()),
+  SF_PATH_RE_OPT(new PatternType()),
+  MIN_FILES_OPT(new UIntType()),
+  OUTPUT_COMPRESSION_OPT(new StringType()),
+  OUTPUT_BLOCK_SIZE_OPT(new SizeType()),
+  OUTPUT_HDFS_BLOCK_SIZE_OPT(new SizeType()),
+  OUTPUT_INDEX_BLOCK_SIZE_OPT(new SizeType()),
+  OUTPUT_REPLICATION_OPT(new UIntType());
+
+  private Type type;
+
+  private CompactionSettings(Type type) {
+    this.type = type;
+  }
+
+  public void put(Map<String,String> options, String val) {
+    options.put(name(), type.convert(val));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
new file mode 100644
index 0000000..ba3ea42
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.tserver.compaction.strategies;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.compaction.CompactionSettings;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.compaction.CompactionPlan;
+import org.apache.accumulo.tserver.compaction.CompactionStrategy;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.compaction.WriteParameters;
+import org.apache.hadoop.fs.Path;
+
+public class ConfigurableCompactionStrategy extends CompactionStrategy {
+
+  private static interface Test {
+    boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request);
+  }
+
+  private static abstract class FileSizeTest implements Test {
+    private final long esize;
+
+    private FileSizeTest(String s) {
+      this.esize = Long.parseLong(s);
+    }
+
+    @Override
+    public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) {
+      return shouldCompact(file.getValue().getSize(), esize);
+    }
+
+    public abstract boolean shouldCompact(long fsize, long esize);
+  }
+
+  private static abstract class PatternPathTest implements Test {
+    private Pattern pattern;
+
+    private PatternPathTest(String p) {
+      this.pattern = Pattern.compile(p);
+    }
+
+    @Override
+    public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) {
+      return pattern.matcher(getInput(file.getKey().path())).matches();
+    }
+
+    public abstract String getInput(Path path);
+
+  }
+
+  private List<Test> tests = new ArrayList<>();
+  private boolean andTest = true;
+  private int minFiles = 1;
+  private WriteParameters writeParams = new WriteParameters();
+
+  public void init(Map<String,String> options) {
+
+    Set<Entry<String,String>> es = options.entrySet();
+    for (Entry<String,String> entry : es) {
+
+      switch (CompactionSettings.valueOf(entry.getKey())) {
+        case SF_LT_ESIZE_OPT:
+          tests.add(new FileSizeTest(entry.getValue()) {
+            @Override
+            public boolean shouldCompact(long fsize, long esize) {
+              return fsize < esize;
+            }
+          });
+          break;
+        case SF_GT_ESIZE_OPT:
+          tests.add(new FileSizeTest(entry.getValue()) {
+            @Override
+            public boolean shouldCompact(long fsize, long esize) {
+              return fsize > esize;
+            }
+          });
+          break;
+        case SF_NAME_RE_OPT:
+          tests.add(new PatternPathTest(entry.getValue()) {
+            @Override
+            public String getInput(Path path) {
+              return path.getName();
+            }
+          });
+          break;
+        case SF_PATH_RE_OPT:
+          tests.add(new PatternPathTest(entry.getValue()) {
+            @Override
+            public String getInput(Path path) {
+              return path.toString();
+            }
+          });
+          break;
+        case MIN_FILES_OPT:
+          minFiles = Integer.parseInt(entry.getValue());
+          break;
+        case OUTPUT_COMPRESSION_OPT:
+          writeParams.setCompressType(entry.getValue());
+          break;
+        case OUTPUT_BLOCK_SIZE_OPT:
+          writeParams.setBlockSize(Long.parseLong(entry.getValue()));
+          break;
+        case OUTPUT_INDEX_BLOCK_SIZE_OPT:
+          writeParams.setIndexBlockSize(Long.parseLong(entry.getValue()));
+          break;
+        case OUTPUT_HDFS_BLOCK_SIZE_OPT:
+          writeParams.setHdfsBlockSize(Long.parseLong(entry.getValue()));
+          break;
+        case OUTPUT_REPLICATION_OPT:
+          writeParams.setReplication(Integer.parseInt(entry.getValue()));
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown option " + entry.getKey());
+      }
+    }
+  }
+
+  private List<FileRef> getFilesToCompact(MajorCompactionRequest request) {
+    List<FileRef> filesToCompact = new ArrayList<>();
+
+    for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
+      boolean compact = false;
+      for (Test test : tests) {
+        if (andTest) {
+          compact = test.shouldCompact(entry, request);
+          if (!compact)
+            break;
+        } else {
+          compact |= test.shouldCompact(entry, request);
+        }
+      }
+
+      if (compact || tests.isEmpty())
+        filesToCompact.add(entry.getKey());
+    }
+    return filesToCompact;
+  }
+
+  @Override
+  public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+    return getFilesToCompact(request).size() >= minFiles;
+  }
+
+  @Override
+  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+    List<FileRef> filesToCompact = getFilesToCompact(request);
+    if (filesToCompact.size() >= minFiles) {
+      CompactionPlan plan = new CompactionPlan();
+      plan.inputFiles.addAll(filesToCompact);
+      plan.writeParameters = writeParams;
+
+      return plan;
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
new file mode 100644
index 0000000..a896537
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.tserver.compaction.strategies;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.compaction.CompactionSettings;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.compaction.CompactionPlan;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConfigurableCompactionStrategyTest {
+
+  // file selection options are adequately tested by ShellServerIT
+
+  @Test
+  public void testOutputOptions() throws Exception {
+    MajorCompactionRequest mcr = new MajorCompactionRequest(new KeyExtent(new Text("1"), null, null), MajorCompactionReason.USER, null, null);
+
+    Map<FileRef,DataFileValue> files = new HashMap<>();
+    files.put(new FileRef("hdfs://nn1/accumulo/tables/1/t-009/F00001.rf"), new DataFileValue(50000, 400));
+    mcr.setFiles(files);
+
+    // test setting no output options
+    ConfigurableCompactionStrategy ccs = new ConfigurableCompactionStrategy();
+
+    Map<String,String> opts = new HashMap<>();
+    ccs.init(opts);
+
+    CompactionPlan plan = ccs.getCompactionPlan(mcr);
+
+    Assert.assertEquals(0, plan.writeParameters.getBlockSize());
+    Assert.assertEquals(0, plan.writeParameters.getHdfsBlockSize());
+    Assert.assertEquals(0, plan.writeParameters.getIndexBlockSize());
+    Assert.assertEquals(0, plan.writeParameters.getReplication());
+    Assert.assertEquals(null, plan.writeParameters.getCompressType());
+
+    // test setting all output options
+    ccs = new ConfigurableCompactionStrategy();
+
+    CompactionSettings.OUTPUT_BLOCK_SIZE_OPT.put(opts, "64K");
+    CompactionSettings.OUTPUT_COMPRESSION_OPT.put(opts, "snappy");
+    CompactionSettings.OUTPUT_HDFS_BLOCK_SIZE_OPT.put(opts, "256M");
+    CompactionSettings.OUTPUT_INDEX_BLOCK_SIZE_OPT.put(opts, "32K");
+    CompactionSettings.OUTPUT_REPLICATION_OPT.put(opts, "5");
+
+    ccs.init(opts);
+
+    plan = ccs.getCompactionPlan(mcr);
+
+    Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("64K"), plan.writeParameters.getBlockSize());
+    Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("256M"), plan.writeParameters.getHdfsBlockSize());
+    Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("32K"), plan.writeParameters.getIndexBlockSize());
+    Assert.assertEquals(5, plan.writeParameters.getReplication());
+    Assert.assertEquals("snappy", plan.writeParameters.getCompressType());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/shell/src/main/java/org/apache/accumulo/shell/Shell.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index 8927ee0..8aadd68 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -1104,6 +1104,7 @@ public class Shell extends ShellOptions {
 
   public static final void setDebugging(boolean debuggingEnabled) {
     Logger.getLogger(Constants.CORE_PACKAGE_NAME).setLevel(debuggingEnabled ? Level.TRACE : Level.INFO);
+    Logger.getLogger(Shell.class.getPackage().getName()).setLevel(debuggingEnabled ? Level.TRACE : Level.INFO);
   }
 
   public static final boolean isDebuggingEnabled() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
index 660630e..9e599ae 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.compaction.CompactionSettings;
 import org.apache.accumulo.shell.Shell;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -35,6 +36,10 @@ import org.apache.commons.cli.Options;
 public class CompactCommand extends TableOperation {
   private Option noFlushOption, waitOpt, profileOpt, cancelOpt, strategyOpt, strategyConfigOpt;
 
+  // file selection and file output options
+  private Option enameOption, epathOption, sizeLtOption, sizeGtOption, minFilesOption, outBlockSizeOpt, outHdfsBlockSizeOpt, outIndexBlockSizeOpt,
+      outCompressionOpt, outReplication;
+
   private CompactionConfig compactionConfig = null;
   
   boolean override = false;
@@ -43,7 +48,9 @@ public class CompactCommand extends TableOperation {
 
   @Override
   public String description() {
-    return "sets all tablets for a table to major compact as soon as possible (based on current time)";
+    return "Initiates a major compaction on tablets within the specified range that have one or more files.  If no file selection options are specified, then "
+        + "all files will be compacted.  Options that configure output settings are only applied to this compaction and not later compactions.  If multiple "
+        + "concurrent user initiated compactions specify iterators or a compaction strategy, then all but one will fail to start.";
   }
   
   protected void doTableOp(final Shell shellState, final String tableName) throws AccumuloException, AccumuloSecurityException {
@@ -70,7 +77,29 @@ public class CompactCommand extends TableOperation {
       }
     }
   }
-  
+
+  private void put(CommandLine cl, Map<String,String> opts, Option opt, CompactionSettings setting) {
+    if (cl.hasOption(opt.getLongOpt()))
+      setting.put(opts, cl.getOptionValue(opt.getLongOpt()));
+  }
+
+  private Map<String,String> getConfigurableCompactionStrategyOpts(CommandLine cl) {
+    Map<String,String> opts = new HashMap<>();
+
+    put(cl, opts, enameOption, CompactionSettings.SF_NAME_RE_OPT);
+    put(cl, opts, epathOption, CompactionSettings.SF_PATH_RE_OPT);
+    put(cl, opts, sizeLtOption, CompactionSettings.SF_LT_ESIZE_OPT);
+    put(cl, opts, sizeGtOption, CompactionSettings.SF_GT_ESIZE_OPT);
+    put(cl, opts, minFilesOption, CompactionSettings.MIN_FILES_OPT);
+    put(cl, opts, outCompressionOpt, CompactionSettings.OUTPUT_COMPRESSION_OPT);
+    put(cl, opts, outBlockSizeOpt, CompactionSettings.OUTPUT_BLOCK_SIZE_OPT);
+    put(cl, opts, outHdfsBlockSizeOpt, CompactionSettings.OUTPUT_HDFS_BLOCK_SIZE_OPT);
+    put(cl, opts, outIndexBlockSizeOpt, CompactionSettings.OUTPUT_INDEX_BLOCK_SIZE_OPT);
+    put(cl, opts, outReplication, CompactionSettings.OUTPUT_REPLICATION_OPT);
+
+    return opts;
+  }
+
   @Override
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception {
     
@@ -101,7 +130,12 @@ public class CompactCommand extends TableOperation {
       compactionConfig.setIterators(new ArrayList<>(iterators));
     }
 
+    Map<String,String> configurableCompactOpt = getConfigurableCompactionStrategyOpts(cl);
+
     if (cl.hasOption(strategyOpt.getOpt())) {
+      if (configurableCompactOpt.size() > 0)
+        throw new IllegalArgumentException("Can not specify compaction strategy with file selection and file output options.");
+
       CompactionStrategyConfig csc = new CompactionStrategyConfig(cl.getOptionValue(strategyOpt.getOpt()));
       if (cl.hasOption(strategyConfigOpt.getOpt())) {
         Map<String,String> props = new HashMap<>();
@@ -117,9 +151,19 @@ public class CompactCommand extends TableOperation {
       compactionConfig.setCompactionStrategy(csc);
     }
 
+    if (configurableCompactOpt.size() > 0) {
+      CompactionStrategyConfig csc = new CompactionStrategyConfig("org.apache.accumulo.tserver.compaction.strategies.ConfigurableCompactionStrategy");
+      csc.setOptions(configurableCompactOpt);
+      compactionConfig.setCompactionStrategy(csc);
+    }
+
     return super.execute(fullCommand, cl, shellState);
   }
   
+  private Option newLAO(String lopt, String desc) {
+    return new Option(null, lopt, true, desc);
+  }
+
   @Override
   public Options getOptions() {
     final Options opts = super.getOptions();
@@ -131,7 +175,7 @@ public class CompactCommand extends TableOperation {
     waitOpt = new Option("w", "wait", false, "wait for compact to finish");
     opts.addOption(waitOpt);
     
-    profileOpt = new Option("pn", "profile", true, "iterator profile name");
+    profileOpt = new Option("pn", "profile", true, "Iterator profile name.");
     profileOpt.setArgName("profile");
     opts.addOption(profileOpt);
 
@@ -143,6 +187,34 @@ public class CompactCommand extends TableOperation {
     cancelOpt = new Option(null, "cancel", false, "cancel user initiated compactions");
     opts.addOption(cancelOpt);
 
+    enameOption = newLAO("sf-ename", "Select files using regular expression to match file names. Only matches against last part of path.");
+    opts.addOption(enameOption);
+    epathOption = newLAO("sf-epath", "Select files using regular expression to match file paths to compact. Matches against full path.");
+    opts.addOption(epathOption);
+    sizeLtOption = newLAO("sf-lt-esize",
+        "Selects files less than specified size.  Uses the estimated size of file in metadata table.  Can use K,M, and G suffixes");
+    opts.addOption(sizeLtOption);
+    sizeGtOption = newLAO("sf-gt-esize",
+        "Selects files greater than specified size.  Uses the estimated size of file in metadata table.  Can use K,M, and G suffixes");
+    opts.addOption(sizeGtOption);
+    minFilesOption = newLAO("min-files",
+        "Only compacts if at least the specified number of files are selected.  When no file selection criteria are given, all files are selected.");
+    opts.addOption(minFilesOption);
+    outBlockSizeOpt = newLAO("out-data-bs",
+        "Rfile data block size to use for compaction output file.  Can use K,M, and G suffixes. Uses table settings if not specified.");
+    opts.addOption(outBlockSizeOpt);
+    outHdfsBlockSizeOpt = newLAO("out-hdfs-bs",
+        "HDFS block size to use for compaction output file.  Can use K,M, and G suffixes. Uses table settings if not specified.");
+    opts.addOption(outHdfsBlockSizeOpt);
+    outIndexBlockSizeOpt = newLAO("out-index-bs",
+        "Rfile index block size to use for compaction output file.  Can use K,M, and G suffixes. Uses table settings if not specified.");
+    opts.addOption(outIndexBlockSizeOpt);
+    outCompressionOpt = newLAO("out-compress",
+        "Compression to use for compaction output file. Either snappy, gz, lzo, or none. Uses table settings if not specified.");
+    opts.addOption(outCompressionOpt);
+    outReplication = newLAO("out-replication", "HDFS replication to use for compaction output file. Uses table settings if not specified.");
+    opts.addOption(outReplication);
+
     return opts;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index d878c7f..e4104ce 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 
 import jline.console.ConsoleReader;
 
@@ -753,6 +754,112 @@ public class ShellServerIT extends SharedMiniClusterIT {
   }
 
   @Test
+  public void testCompactionSelection() throws Exception {
+    final String table = name.getMethodName();
+    final String clone = table + "_clone";
+
+    ts.exec("createtable " + table);
+    ts.exec("insert a b c d");
+    ts.exec("flush -w");
+    ts.exec("insert x y z v");
+    ts.exec("flush -w");
+
+    ts.exec("clonetable -s " + Property.TABLE_MAJC_RATIO.getKey() + "=10 " + table + " " + clone);
+
+    ts.exec("table " + clone);
+    ts.exec("insert m n l o");
+    ts.exec("flush -w");
+
+    String tableId = getTableId(table);
+    String cloneId = getTableId(clone);
+
+    assertEquals(3, countFiles(cloneId));
+
+    // compact only files from src table
+    ts.exec("compact -t " + clone + " -w --sf-epath .*tables/" + tableId + ".*");
+
+    assertEquals(2, countFiles(cloneId));
+
+    ts.exec("insert r s t u");
+    ts.exec("flush -w");
+
+    assertEquals(3, countFiles(cloneId));
+
+    // compact all flush files
+    ts.exec("compact -t " + clone + " -w --sf-ename F.*");
+
+    assertEquals(2, countFiles(cloneId));
+
+    // create two large files
+    Random rand = new Random();
+    StringBuilder sb = new StringBuilder("insert b v q ");
+    for (int i = 0; i < 10000; i++) {
+      sb.append('a' + rand.nextInt(26));
+    }
+
+    ts.exec(sb.toString());
+    ts.exec("flush -w");
+
+    ts.exec(sb.toString());
+    ts.exec("flush -w");
+
+    assertEquals(4, countFiles(cloneId));
+
+    // compact only small files
+    ts.exec("compact -t " + clone + " -w --sf-lt-esize 1000");
+
+    assertEquals(3, countFiles(cloneId));
+
+    // compact large files if 3 or more
+    ts.exec("compact -t " + clone + " -w --sf-gt-esize 1K --min-files 3");
+
+    assertEquals(3, countFiles(cloneId));
+
+    // compact large files if 2 or more
+    ts.exec("compact -t " + clone + " -w --sf-gt-esize 1K --min-files 2");
+
+    assertEquals(2, countFiles(cloneId));
+
+    // compact if tablet has 3 or more files
+    ts.exec("compact -t " + clone + " -w --min-files 3");
+
+    assertEquals(2, countFiles(cloneId));
+
+    // compact if tablet has 2 or more files
+    ts.exec("compact -t " + clone + " -w --min-files 2");
+
+    assertEquals(1, countFiles(cloneId));
+
+    // create two small and one large flush files in order to test AND
+    ts.exec(sb.toString());
+    ts.exec("flush -w");
+
+    ts.exec("insert m n l o");
+    ts.exec("flush -w");
+
+    ts.exec("insert m n l o");
+    ts.exec("flush -w");
+
+    assertEquals(4, countFiles(cloneId));
+
+    // should only compact two small flush files leaving large flush file
+    ts.exec("compact -t " + clone + " -w --sf-ename F.* --sf-lt-esize 1K");
+
+    assertEquals(3, countFiles(cloneId));
+  }
+
+  @Test
+  public void testCompactionSelectionAndStrategy() throws Exception {
+
+    final String table = name.getMethodName();
+
+    ts.exec("createtable " + table);
+
+    // expect this to fail
+    ts.exec("compact -t " + table + " -w --sf-ename F.* -s " + TestCompactionStrategy.class.getName() + " -sc inputPrefix=F,dropPrefix=A", false);
+  }
+
+  @Test
   public void constraint() throws Exception {
     final String table = name.getMethodName();