You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/01/15 16:26:47 UTC

accumulo git commit: ACCUMULO-3467 fixed bug with concurrent compactions

Repository: accumulo
Updated Branches:
  refs/heads/master be4aade67 -> 47a091ade


ACCUMULO-3467 fixed bug with concurrent compactions


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47a091ad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47a091ad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47a091ad

Branch: refs/heads/master
Commit: 47a091adec4a40cdf6852cc0f8c432c15034cb5e
Parents: be4aade
Author: Keith Turner <kt...@apache.org>
Authored: Thu Jan 15 10:20:52 2015 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Thu Jan 15 10:20:52 2015 -0500

----------------------------------------------------------------------
 .../core/client/admin/CompactionConfig.java     |  9 ++---
 .../client/admin/CompactionStrategyConfig.java  | 10 ++++++
 .../core/client/admin/TableOperations.java      |  3 +-
 .../impl/CompactionStrategyConfigUtil.java      |  9 +++++
 .../master/tableOps/UserCompactionConfig.java   |  1 +
 .../accumulo/master/tableOps/CompactRange.java  | 12 +++++--
 .../accumulo/test/UserCompactionStrategyIT.java | 35 ++++++++++++++++++++
 7 files changed, 69 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
index 38e5efd..064d836 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
@@ -20,9 +20,9 @@ package org.apache.accumulo.core.client.admin;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil;
 import org.apache.hadoop.io.Text;
 
 import com.google.common.base.Preconditions;
@@ -39,12 +39,7 @@ public class CompactionConfig {
   private boolean flush = true;
   private boolean wait = true;
   private List<IteratorSetting> iterators = Collections.emptyList();
-  private CompactionStrategyConfig compactionStrategy = new CompactionStrategyConfig("org.apache.accumulo.tserver.compaction.EverythingCompactionStrategy") {
-    @Override
-    public CompactionStrategyConfig setOptions(Map<String,String> opts) {
-      throw new UnsupportedOperationException();
-    }
-  };
+  private CompactionStrategyConfig compactionStrategy = CompactionStrategyConfigUtil.DEFAULT_STRATEGY;
 
   /**
    * @param start

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
index c23b511..0992ba9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
@@ -71,4 +71,14 @@ public class CompactionStrategyConfig {
   public Map<String,String> getOptions() {
     return Collections.unmodifiableMap(options);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof CompactionStrategyConfig) {
+      CompactionStrategyConfig ocsc = (CompactionStrategyConfig) o;
+      return className.equals(ocsc.className) && options.equals(ocsc.options);
+    }
+
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index 5c1260c..41021b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -300,7 +300,8 @@ public interface TableOperations {
    * @param end
    *          last tablet to be merged contains this row, null means the last tablet in table
    * @param iterators
-   *          A set of iterators that will be applied to each tablet compacted
+   *          A set of iterators that will be applied to each tablet compacted. If two or more concurrent calls to compact pass iterators, then only one will
+   *          succeed and the others will fail.
    * @param flush
    *          when true, table memory is flushed before compaction starts
    * @param wait

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java b/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
index 8dce877..758f445 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
@@ -25,12 +25,21 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
 
 public class CompactionStrategyConfigUtil {
 
+  public static final CompactionStrategyConfig DEFAULT_STRATEGY = new CompactionStrategyConfig(
+      "org.apache.accumulo.tserver.compaction.EverythingCompactionStrategy") {
+    @Override
+    public CompactionStrategyConfig setOptions(Map<String,String> opts) {
+      throw new UnsupportedOperationException();
+    }
+  };
+
   private static final int MAGIC = 0xcc5e6024;
 
   public static void encode(DataOutput dout, CompactionStrategyConfig csc) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java
index 98d7fd7..02c6ac3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java
@@ -46,6 +46,7 @@ public class UserCompactionConfig implements Writable {
     startRow = null;
     endRow = null;
     iterators = Collections.emptyList();
+    compactionStrategy = CompactionStrategyConfigUtil.DEFAULT_STRATEGY;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index fd7decf..580852d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
@@ -61,6 +62,8 @@ import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 
+import com.google.common.base.Preconditions;
+
 class CompactionDriver extends MasterRepo {
 
   private static final long serialVersionUID = 1L;
@@ -207,11 +210,16 @@ public class CompactRange extends MasterRepo {
 
   public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy)
       throws ThriftTableOperationException {
+
+    Preconditions.checkNotNull(tableId, "Invalid argument: null tableId");
+    Preconditions.checkNotNull(iterators, "Invalid argument: null iterator list");
+    Preconditions.checkNotNull(compactionStrategy, "Invalid argument: null compactionStrategy");
+
     this.tableId = tableId;
     this.startRow = startRow.length == 0 ? null : startRow;
     this.endRow = endRow.length == 0 ? null : endRow;
 
-    if (iterators.size() > 0 || compactionStrategy != null) {
+    if (iterators.size() > 0 || !compactionStrategy.equals(CompactionStrategyConfigUtil.DEFAULT_STRATEGY)) {
       this.config = WritableUtils.toByteArray(new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy));
     }
 
@@ -249,7 +257,7 @@ public class CompactRange extends MasterRepo {
               continue; // skip self
 
             throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
-                "Another compaction with iterators is running");
+                "Another compaction with iterators and/or a compaction strategy is running");
           }
 
           StringBuilder encodedIterators = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
index 5421f52..7a3162b 100644
--- a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
@@ -26,6 +26,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -44,6 +45,7 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.harness.AccumuloClusterIT;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.accumulo.tserver.compaction.CompactionPlan;
 import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
@@ -300,6 +302,39 @@ public class UserCompactionStrategyIT extends AccumuloClusterIT {
 
   }
 
+  @Test
+  public void testConcurrent() throws Exception {
+    // two compactions without iterators or strategy should be able to run concurrently
+
+    Connector c = getConnector();
+
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+
+    // write random data because its very unlikely it will compress
+    writeRandomValue(c, tableName, 1 << 16);
+    writeRandomValue(c, tableName, 1 << 16);
+
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(false));
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+
+    writeRandomValue(c, tableName, 1 << 16);
+
+    IteratorSetting iterConfig = new IteratorSetting(30, SlowIterator.class);
+    SlowIterator.setSleepTime(iterConfig, 1000);
+
+    long t1 = System.currentTimeMillis();
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(false).setIterators(Arrays.asList(iterConfig)));
+    try {
+      // this compaction should fail because previous one set iterators
+      c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+      if (System.currentTimeMillis() - t1 < 2000)
+        Assert.fail("Expected compaction to fail because another concurrent compaction set iterators");
+    } catch (AccumuloException e) {}
+  }
+
   void writeRandomValue(Connector c, String tableName, int size) throws Exception {
     Random rand = new Random();