You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/02/05 17:26:50 UTC

[accumulo] branch master updated: Merged multiple compaction strategies into one. (#935)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new bc0e76a  Merged multiple compaction strategies into one. (#935)
bc0e76a is described below

commit bc0e76a1192bb69f8acb85fdab464f6f2bcaee2b
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Feb 5 12:26:45 2019 -0500

    Merged multiple compaction strategies into one. (#935)
    
    While testing 2.0.0-alpha-2 I found that I wanted the functionality of
    the size limit and two tier compactions strategies. However could only
    configure one.  This PR merges those two into one.  Two tier can be
    dropped because it is new in 2.0.0.
---
 .../compaction/SizeLimitCompactionStrategy.java    |   4 +
 .../compaction/TwoTierCompactionStrategy.java      | 121 ---------------
 .../strategies/BasicCompactionStrategy.java        | 164 +++++++++++++++++++++
 .../strategies/ConfigurableCompactionStrategy.java |   3 +
 .../SizeLimitCompactionStrategyTest.java           |  16 +-
 .../BasicCompactionStrategyTest.java}              |  42 ++++--
 6 files changed, 213 insertions(+), 137 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
index 0077964..4e543e2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
@@ -24,7 +24,11 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.compaction.strategies.BasicCompactionStrategy;
 
+/**
+ * {@link BasicCompactionStrategy} offer the same functionality as this class and more.
+ */
 public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy {
   public static final String SIZE_LIMIT_OPT = "sizeLimit";
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategy.java
deleted file mode 100644
index 7d97e94..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategy.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.compaction;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.log4j.Logger;
-
-/**
- * A hybrid compaction strategy that supports two types of compression. If total size of files being
- * compacted is larger than
- * <tt>table.majc.compaction.strategy.opts.file.large.compress.threshold</tt> than the larger
- * compression type will be used. The larger compression type is specified in
- * <tt>table.majc.compaction.strategy.opts.file.large.compress.type</tt>. Otherwise, the configured
- * table compression will be used.
- *
- * NOTE: To use this strategy with Minor Compactions set <tt>table.file.compress.type=snappy</tt>
- * and set a different compress type in
- * <tt>table.majc.compaction.strategy.opts.file.large.compress.type</tt> for larger files.
- */
-public class TwoTierCompactionStrategy extends DefaultCompactionStrategy {
-  private final Logger log = Logger.getLogger(TwoTierCompactionStrategy.class);
-  /**
-   * Threshold memory in bytes. Files larger than this threshold will use
-   * <tt>table.majc.compaction.strategy.opts.file.large.compress.type</tt> for compression
-   */
-  public static final String LARGE_FILE_COMPRESSION_THRESHOLD = "file.large.compress.threshold";
-  private Long largeFileCompressionThreshold;
-
-  /**
-   * Type of compression to use if large threshold is surpassed. One of "gz","lzo","snappy", or
-   * "none"
-   */
-  public static final String LARGE_FILE_COMPRESSION_TYPE = "file.large.compress.type";
-  private String largeFileCompressionType;
-
-  /**
-   * Helper method to check for required table properties.
-   *
-   * @param objectsToVerify
-   *          any objects that shouldn't be null
-   * @throws IllegalArgumentException
-   *           if any object in {@code objectsToVerify} is null
-   *
-   */
-  public void verifyRequiredProperties(Object... objectsToVerify) throws IllegalArgumentException {
-    for (Object obj : objectsToVerify) {
-      if (obj == null) {
-        throw new IllegalArgumentException("Missing required "
-            + Property.TABLE_COMPACTION_STRATEGY_PREFIX + " (" + LARGE_FILE_COMPRESSION_TYPE
-            + " and/or " + LARGE_FILE_COMPRESSION_THRESHOLD + ") for " + this.getClass().getName());
-      }
-    }
-  }
-
-  /**
-   * Calculates the total size of input files in the compaction plan
-   */
-  private Long calculateTotalSize(MajorCompactionRequest request, CompactionPlan plan) {
-    long totalSize = 0;
-    Map<FileRef,DataFileValue> allFiles = request.getFiles();
-    for (FileRef fileRef : plan.inputFiles) {
-      totalSize += allFiles.get(fileRef).getSize();
-    }
-    return totalSize;
-  }
-
-  @Override
-  public void init(Map<String,String> options) {
-    String threshold = options.get(LARGE_FILE_COMPRESSION_THRESHOLD);
-    largeFileCompressionType = options.get(LARGE_FILE_COMPRESSION_TYPE);
-    verifyRequiredProperties(threshold, largeFileCompressionType);
-    largeFileCompressionThreshold = ConfigurationTypeHelper.getFixedMemoryAsBytes(threshold);
-  }
-
-  @Override
-  public boolean shouldCompact(MajorCompactionRequest request) {
-    return super.shouldCompact(request);
-  }
-
-  @Override
-  public void gatherInformation(MajorCompactionRequest request) throws IOException {
-    super.gatherInformation(request);
-  }
-
-  @Override
-  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) {
-    CompactionPlan plan = super.getCompactionPlan(request);
-    plan.writeParameters = new WriteParameters();
-    Long totalSize = calculateTotalSize(request, plan);
-
-    if (totalSize > largeFileCompressionThreshold) {
-      if (log.isDebugEnabled()) {
-        log.debug("Changed compressType to " + largeFileCompressionType + ": totalSize(" + totalSize
-            + ") was greater than threshold " + largeFileCompressionThreshold);
-      }
-      plan.writeParameters.setCompressType(largeFileCompressionType);
-    }
-    return plan;
-  }
-
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/BasicCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/BasicCompactionStrategy.java
new file mode 100644
index 0000000..17ecbb2
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/BasicCompactionStrategy.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.compaction.strategies;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.compaction.CompactionPlan;
+import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.compaction.WriteParameters;
+import org.apache.log4j.Logger;
+
+/**
+ * A compaction strategy that covers the following uses cases.
+ *
+ * <ul>
+ * <li>Filtering out input files larger than a specified size. These are never considered for
+ * compaction.
+ * <li>Compressing output files differently when the sum of the input files exceeds a specified
+ * size.
+ * </ul>
+ *
+ * <p>
+ * To filter out input files based on size set
+ * {@code table.majc.compaction.strategy.opts.filter.size} to the desired size.
+ *
+ * <p>
+ * To use a different compression for larger inputs set
+ * {@code table.majc.compaction.strategy.opts.large.compress.threshold } to bytes and
+ * {@code  table.majc.compaction.strategy.opts.large.compress.type} to a compression type like gz or
+ * snappy. When setting one of these properties then the other must be set. When the total size of
+ * files being compacted is larger than the threshold then the specified compression type is used.
+ *
+ * <p>
+ * To use this strategy with Minor Compactions set {@code table.file.compress.type=snappy} and set a
+ * different compress type in {@code table.majc.compaction.strategy.opts.large.compress.type} for
+ * larger files.
+ *
+ * <p>
+ * The options that take sizes are in bytes and the suffixes K,M,and G can be used.
+ */
+public class BasicCompactionStrategy extends DefaultCompactionStrategy {
+
+  private static final Logger log = Logger.getLogger(BasicCompactionStrategy.class);
+
+  public static final String SIZE_LIMIT_OPT = "filter.size";
+
+  /**
+   * Threshold memory in bytes. Files larger than this threshold will use
+   * <tt>table.majc.compaction.strategy.opts.file.large.compress.type</tt> for compression
+   */
+  public static final String LARGE_FILE_COMPRESSION_THRESHOLD = "large.compress.threshold";
+
+  /**
+   * Type of compression to use if large threshold is surpassed. One of "gz","lzo","snappy", or
+   * "none"
+   */
+  public static final String LARGE_FILE_COMPRESSION_TYPE = "large.compress.type";
+
+  private Long filterSize;
+  private Long largeThresh;
+  private String largeCompress;
+
+  @Override
+  public void init(Map<String,String> options) {
+    String limitVal = options.get(SIZE_LIMIT_OPT);
+    if (limitVal != null) {
+      filterSize = ConfigurationTypeHelper.getFixedMemoryAsBytes(limitVal);
+    }
+
+    String largeThresh = options.get(LARGE_FILE_COMPRESSION_THRESHOLD);
+    String largeCompress = options.get(LARGE_FILE_COMPRESSION_TYPE);
+    if (largeThresh != null && largeCompress != null) {
+      this.largeThresh = ConfigurationTypeHelper.getFixedMemoryAsBytes(largeThresh);
+      this.largeCompress = largeCompress;
+    } else if (largeThresh != null ^ largeCompress != null) {
+      throw new IllegalArgumentException("Must set both of "
+          + Property.TABLE_COMPACTION_STRATEGY_PREFIX + " (" + LARGE_FILE_COMPRESSION_TYPE + " and "
+          + LARGE_FILE_COMPRESSION_THRESHOLD + ") or neither for " + this.getClass().getName());
+    }
+
+  }
+
+  @Override
+  public boolean shouldCompact(MajorCompactionRequest request) {
+    return super.shouldCompact(filterFiles(request));
+  }
+
+  @Override
+  public void gatherInformation(MajorCompactionRequest request) throws IOException {
+    super.gatherInformation(filterFiles(request));
+  }
+
+  @Override
+  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) {
+
+    request = filterFiles(request);
+
+    CompactionPlan plan = super.getCompactionPlan(request);
+
+    if (largeThresh != null) {
+
+      Long totalSize = calculateTotalSize(request, plan);
+
+      if (totalSize > largeThresh) {
+        plan.writeParameters = new WriteParameters();
+        if (log.isDebugEnabled()) {
+          log.debug("Changed compressType to " + largeCompress + ": totalSize(" + totalSize
+              + ") was greater than threshold " + largeThresh);
+        }
+        plan.writeParameters.setCompressType(largeCompress);
+      }
+    }
+
+    return plan;
+
+  }
+
+  private MajorCompactionRequest filterFiles(MajorCompactionRequest mcr) {
+    if (filterSize != null) {
+      Map<FileRef,DataFileValue> filteredFiles = new HashMap<>();
+      mcr.getFiles().forEach((fr, dfv) -> {
+        if (dfv.getSize() <= filterSize)
+          filteredFiles.put(fr, dfv);
+      });
+
+      mcr = new MajorCompactionRequest(mcr);
+      mcr.setFiles(filteredFiles);
+    }
+    return mcr;
+  }
+
+  /**
+   * Calculates the total size of input files in the compaction plan
+   */
+  private Long calculateTotalSize(MajorCompactionRequest request, CompactionPlan plan) {
+    long totalSize = 0;
+    Map<FileRef,DataFileValue> allFiles = request.getFiles();
+    for (FileRef fileRef : plan.inputFiles) {
+      totalSize += allFiles.get(fileRef).getSize();
+    }
+    return totalSize;
+  }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
index 491d73e..3310c86 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
@@ -44,6 +44,9 @@ import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
 import org.apache.accumulo.tserver.compaction.WriteParameters;
 import org.apache.hadoop.fs.Path;
 
+/**
+ * The compaction strategy used by the shell compact command.
+ */
 public class ConfigurableCompactionStrategy extends CompactionStrategy {
 
   private abstract static class Test {
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java
index e494426..3c7ebfc 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -35,7 +36,7 @@ import org.junit.Test;
 
 public class SizeLimitCompactionStrategyTest {
 
-  private Map<FileRef,DataFileValue> nfl(String... sa) {
+  private static Map<FileRef,DataFileValue> nfl(String... sa) {
 
     HashMap<FileRef,DataFileValue> ret = new HashMap<>();
     for (int i = 0; i < sa.length; i += 2) {
@@ -46,11 +47,9 @@ public class SizeLimitCompactionStrategyTest {
     return ret;
   }
 
-  @Test
-  public void testLimits() {
-    SizeLimitCompactionStrategy slcs = new SizeLimitCompactionStrategy();
+  public static void testSizeLimit(String opt, CompactionStrategy slcs) throws IOException {
     HashMap<String,String> opts = new HashMap<>();
-    opts.put(SizeLimitCompactionStrategy.SIZE_LIMIT_OPT, "1G");
+    opts.put(opt, "1G");
 
     slcs.init(opts);
 
@@ -72,4 +71,11 @@ public class SizeLimitCompactionStrategyTest {
         new HashSet<>(slcs.getCompactionPlan(mcr).inputFiles));
     assertEquals(8, mcr.getFiles().size());
   }
+
+  @Test
+  public void testLimits() throws IOException {
+    SizeLimitCompactionStrategy slcs = new SizeLimitCompactionStrategy();
+
+    testSizeLimit(SizeLimitCompactionStrategy.SIZE_LIMIT_OPT, slcs);
+  }
 }
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/BasicCompactionStrategyTest.java
similarity index 77%
rename from server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java
rename to server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/BasicCompactionStrategyTest.java
index 54ebdd9..26977a1 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/TwoTierCompactionStrategyTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/BasicCompactionStrategyTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.tserver.compaction;
+package org.apache.accumulo.tserver.compaction.strategies;
 
 import static org.apache.accumulo.tserver.compaction.DefaultCompactionStrategyTest.getServerContext;
 import static org.junit.Assert.assertEquals;
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,15 +36,18 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.tserver.InMemoryMapTest;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.compaction.SizeLimitCompactionStrategyTest;
 import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Tests org.apache.accumulo.tserver.compaction.TwoTierCompactionStrategy
+ * Tests org.apache.accumulo.tserver.compaction.BasicCompactionStrategy
  */
-public class TwoTierCompactionStrategyTest {
+public class BasicCompactionStrategyTest {
   private String largeCompressionType = "gz";
-  private TwoTierCompactionStrategy ttcs = null;
+  private BasicCompactionStrategy ttcs = null;
   private MajorCompactionRequest mcr = null;
   private AccumuloConfiguration conf = null;
   private HashMap<String,String> opts = new HashMap<>();
@@ -61,9 +65,9 @@ public class TwoTierCompactionStrategyTest {
 
   @Before
   public void setup() {
-    opts.put(TwoTierCompactionStrategy.LARGE_FILE_COMPRESSION_TYPE, largeCompressionType);
-    opts.put(TwoTierCompactionStrategy.LARGE_FILE_COMPRESSION_THRESHOLD, "500M");
-    ttcs = new TwoTierCompactionStrategy();
+    opts.put(BasicCompactionStrategy.LARGE_FILE_COMPRESSION_TYPE, largeCompressionType);
+    opts.put(BasicCompactionStrategy.LARGE_FILE_COMPRESSION_THRESHOLD, "500M");
+    ttcs = new BasicCompactionStrategy();
   }
 
   @Test
@@ -83,7 +87,7 @@ public class TwoTierCompactionStrategyTest {
     List<FileRef> filesToCompact = ttcs.getCompactionPlan(mcr).inputFiles;
     assertEquals(fileMap.keySet(), new HashSet<>(filesToCompact));
     assertEquals(8, filesToCompact.size());
-    assertNull(ttcs.getCompactionPlan(mcr).writeParameters.getCompressType());
+    assertNull(ttcs.getCompactionPlan(mcr).writeParameters);
   }
 
   @Test
@@ -107,9 +111,20 @@ public class TwoTierCompactionStrategyTest {
   }
 
   @Test
-  public void testMissingConfigProperties() {
+  public void testMissingType() {
     try {
-      opts.clear();
+      opts.remove(BasicCompactionStrategy.LARGE_FILE_COMPRESSION_TYPE);
+      ttcs.init(opts);
+      fail("IllegalArgumentException should have been thrown.");
+    } catch (IllegalArgumentException iae) {} catch (Throwable t) {
+      fail("IllegalArgumentException should have been thrown.");
+    }
+  }
+
+  @Test
+  public void testMissingThreshold() {
+    try {
+      opts.remove(BasicCompactionStrategy.LARGE_FILE_COMPRESSION_THRESHOLD);
       ttcs.init(opts);
       fail("IllegalArgumentException should have been thrown.");
     } catch (IllegalArgumentException iae) {} catch (Throwable t) {
@@ -135,7 +150,12 @@ public class TwoTierCompactionStrategyTest {
     List<FileRef> filesToCompact = ttcs.getCompactionPlan(mcr).inputFiles;
     assertEquals(filesToCompactMap.keySet(), new HashSet<>(filesToCompact));
     assertEquals(6, filesToCompact.size());
-    assertNull(ttcs.getCompactionPlan(mcr).writeParameters.getCompressType());
+    assertNull(ttcs.getCompactionPlan(mcr).writeParameters);
   }
 
+  @Test
+  public void testLimits() throws IOException {
+    BasicCompactionStrategy slcs = new BasicCompactionStrategy();
+    SizeLimitCompactionStrategyTest.testSizeLimit(BasicCompactionStrategy.SIZE_LIMIT_OPT, slcs);
+  }
 }