You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/04/15 01:22:18 UTC

accumulo git commit: ACCUMULO-3645 Run user major compactions with iterators when tablets are empty. Removed/modified lines in Tablet that stop construction of the SKVI stack when a tablet has no data and there are iterators present and it is a user comp

Repository: accumulo
Updated Branches:
  refs/heads/master 85d254e12 -> fad97f7a6


ACCUMULO-3645 Run user major compactions with iterators when tablets are empty.
Removed/modified lines in Tablet that stop construction of the SKVI stack
when a tablet has no data and there are iterators present and it is a user compaction.
Includes HardListIterator, an iterator that generates data for testing purposes.
Includes integration tests in accumulo-test.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fad97f7a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fad97f7a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fad97f7a

Branch: refs/heads/master
Commit: fad97f7a667dd34dcdce576fc0a1ac3514db434b
Parents: 85d254e
Author: Dylan Hutchison <dh...@mit.edu>
Authored: Tue Apr 14 16:49:20 2015 -0400
Committer: Dylan Hutchison <dh...@mit.edu>
Committed: Tue Apr 14 16:52:31 2015 -0400

----------------------------------------------------------------------
 .../EverythingCompactionStrategy.java           |   2 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  63 ++++-----
 .../apache/accumulo/test/HardListIterator.java  | 115 +++++++++++++++++
 .../apache/accumulo/test/TableOperationsIT.java | 127 +++++++++++++++++++
 4 files changed, 277 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
index 9295c30..2710177 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
@@ -27,7 +27,7 @@ public class EverythingCompactionStrategy extends CompactionStrategy {
 
   @Override
   public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
-    return request.getFiles().size() > 0;
+      return true; // ACCUMULO-3645 compact for empty files too
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 2342789..7c152b0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1863,21 +1863,26 @@ public class Tablet implements TabletCommitter {
 
       if (inputFiles.isEmpty()) {
         if (reason == MajorCompactionReason.USER) {
-          // no work to do
-          lastCompactID = compactionId.getFirst();
-          updateCompactionID = true;
+          if (compactionId.getSecond().getIterators().isEmpty()) {
+            log.debug("No-op major compaction by USER on 0 input files because no iterators present.");
+            lastCompactID = compactionId.getFirst();
+            updateCompactionID = true;
+          } else {
+            log.debug("Major compaction by USER on 0 input files with iterators.");
+            filesToCompact = new HashMap<>();
+          }
         } else {
           return majCStats;
         }
       } else {
         // If no original files will exist at the end of the compaction, we do not have to propogate deletes
-        Set<FileRef> droppedFiles = new HashSet<FileRef>();
+        Set<FileRef> droppedFiles = new HashSet<>();
         droppedFiles.addAll(inputFiles);
         if (plan != null)
           droppedFiles.addAll(plan.deleteFiles);
         propogateDeletes = !(droppedFiles.equals(allFiles.keySet()));
         log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes);
-        filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles);
+        filesToCompact = new HashMap<>(allFiles);
         filesToCompact.keySet().retainAll(inputFiles);
 
         getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet());
@@ -1916,6 +1921,7 @@ public class Tablet implements TabletCommitter {
             // compaction was canceled
             return majCStats;
           }
+          compactionIterators = compactionId.getSecond().getIterators();
 
           synchronized (this) {
             if (lastCompactID >= compactionId.getFirst())
@@ -1924,12 +1930,11 @@ public class Tablet implements TabletCommitter {
           }
         }
 
-        compactionIterators = compactionId.getSecond().getIterators();
       }
 
       // need to handle case where only one file is being major compacted
-      while (filesToCompact.size() > 0) {
-
+      // ACCUMULO-3645 run loop at least once, even if filesToCompact.isEmpty()
+      do {
         int numToCompact = maxFilesToCompact;
 
         if (filesToCompact.size() > maxFilesToCompact && filesToCompact.size() < 2 * maxFilesToCompact) {
@@ -1995,7 +2000,7 @@ public class Tablet implements TabletCommitter {
           span.stop();
         }
 
-      }
+      } while (filesToCompact.size() > 0);
       return majCStats;
     } finally {
       synchronized (Tablet.this) {
@@ -2025,6 +2030,13 @@ public class Tablet implements TabletCommitter {
   private Set<FileRef> removeSmallest(Map<FileRef,DataFileValue> filesToCompact, int maxFilesToCompact) {
     // ensure this method works properly when multiple files have the same size
 
+    // short-circuit; also handles zero files case
+    if (filesToCompact.size() <= maxFilesToCompact) {
+      Set<FileRef> smallestFiles = new HashSet<FileRef>(filesToCompact.keySet());
+      filesToCompact.clear();
+      return smallestFiles;
+    }
+
     PriorityQueue<Pair<FileRef,Long>> fileHeap = new PriorityQueue<Pair<FileRef,Long>>(filesToCompact.size(), new Comparator<Pair<FileRef,Long>>() {
       @Override
       public int compare(Pair<FileRef,Long> o1, Pair<FileRef,Long> o2) {
@@ -2565,29 +2577,22 @@ public class Tablet implements TabletCommitter {
       if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || isMajorCompactionRunning())
         return;
 
-      if (getDatafileManager().getDatafileSizes().size() == 0) {
-        // no files, so jsut update the metadata table
-        majorCompactionState = CompactionState.IN_PROGRESS;
-        updateMetadata = true;
-        lastCompactID = compactionId;
-      } else {
-        CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy();
-        CompactionStrategy strategy = createCompactionStrategy(strategyConfig);
+      CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy();
+      CompactionStrategy strategy = createCompactionStrategy(strategyConfig);
 
-        MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration);
-        request.setFiles(getDatafileManager().getDatafileSizes());
+      MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration);
+      request.setFiles(getDatafileManager().getDatafileSizes());
 
-        try {
-          if (strategy.shouldCompact(request)) {
-            initiateMajorCompaction(MajorCompactionReason.USER);
-          } else {
-            majorCompactionState = CompactionState.IN_PROGRESS;
-            updateMetadata = true;
-            lastCompactID = compactionId;
-          }
-        } catch (IOException e) {
-          throw new RuntimeException(e);
+      try {
+        if (strategy.shouldCompact(request)) {
+          initiateMajorCompaction(MajorCompactionReason.USER);
+        } else {
+          majorCompactionState = CompactionState.IN_PROGRESS;
+          updateMetadata = true;
+          lastCompactID = compactionId;
         }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/test/src/main/java/org/apache/accumulo/test/HardListIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/HardListIterator.java b/test/src/main/java/org/apache/accumulo/test/HardListIterator.java
new file mode 100644
index 0000000..bd9cbf1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/HardListIterator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * A wrapper making a list of hardcoded data into a SKVI. For testing.
+ */
+public class HardListIterator implements SortedKeyValueIterator<Key,Value> {
+  private static final Logger log = Logger.getLogger(HardListIterator.class);
+  public final static SortedMap<Key,Value> allEntriesToInject;
+  static {
+    SortedMap<Key,Value> t = new TreeMap<>();
+    t.put(new Key(new Text("a1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes()));
+    t.put(new Key(new Text("c1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes()));
+    t.put(new Key(new Text("m1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes()));
+    allEntriesToInject = Collections.unmodifiableSortedMap(t); // for safety
+  }
+
+  private PeekingIterator<Map.Entry<Key,Value>> inner;
+  private Range seekRng;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    if (source != null)
+      log.info("HardListIterator ignores/replaces parent source passed in init(): " + source);
+
+    IteratorUtil.IteratorScope scope = env.getIteratorScope();
+    log.debug(this.getClass() + ": init on scope " + scope + (scope == IteratorUtil.IteratorScope.majc ? " fullScan=" + env.isFullMajorCompaction() : ""));
+
+    // define behavior before seek as seek to start at negative infinity
+    inner = new PeekingIterator<>(allEntriesToInject.entrySet().iterator());
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    HardListIterator newInstance;
+    try {
+      newInstance = HardListIterator.class.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    newInstance.inner = new PeekingIterator<>(allEntriesToInject.tailMap(inner.peek().getKey()).entrySet().iterator());
+
+    return newInstance;
+  }
+
+  @Override
+  public boolean hasTop() {
+    if (!inner.hasNext())
+      return false;
+    Key k = inner.peek().getKey();
+    return seekRng.contains(k); // do not return entries past the seek() range
+  }
+
+  @Override
+  public void next() throws IOException {
+    inner.next();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    seekRng = range;
+    // seek to first entry inside range
+    if (range.isInfiniteStartKey())
+      inner = new PeekingIterator<>(allEntriesToInject.entrySet().iterator());
+    else if (range.isStartKeyInclusive())
+      inner = new PeekingIterator<>(allEntriesToInject.tailMap(range.getStartKey()).entrySet().iterator());
+    else
+      inner = new PeekingIterator<>(allEntriesToInject.tailMap(range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)).entrySet()
+          .iterator());
+  }
+
+  @Override
+  public Key getTopKey() {
+    return hasTop() ? inner.peek().getKey() : null;
+  }
+
+  @Override
+  public Value getTopValue() {
+    return hasTop() ? inner.peek().getValue() : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
index 083a77a..0127e6e 100644
--- a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -20,15 +20,19 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -36,6 +40,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -45,14 +50,18 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.test.functional.BadIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -229,4 +238,122 @@ public class TableOperationsIT extends AccumuloClusterIT {
     return map;
   }
 
+  @Test
+  public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+
+    List<IteratorSetting> list = new ArrayList<>();
+    list.add(new IteratorSetting(15, HardListIterator.class));
+    connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+    for (Map.Entry<Key,Value> entry : scanner)
+      actual.put(entry.getKey(), entry.getValue());
+    assertEquals(HardListIterator.allEntriesToInject, actual);
+    connector.tableOperations().delete(tableName);
+  }
+
+  /** Compare only the row, column family and column qualifier. */
+  static class KeyRowColFColQComparator implements Comparator<Key> {
+    @Override
+    public int compare(Key k1, Key k2) {
+      return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL);
+    }
+  }
+
+  static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator();
+
+  @Test
+  public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+    SortedSet<Text> splitset = new TreeSet<>();
+    splitset.add(new Text("f"));
+    connector.tableOperations().addSplits(tableName, splitset);
+
+    List<IteratorSetting> list = new ArrayList<>();
+    list.add(new IteratorSetting(15, HardListIterator.class));
+    connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+    for (Map.Entry<Key,Value> entry : scanner)
+      actual.put(entry.getKey(), entry.getValue());
+    assertEquals(HardListIterator.allEntriesToInject, actual);
+    connector.tableOperations().delete(tableName);
+  }
+
+  @Test
+  public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+    SortedSet<Text> splitset = new TreeSet<>();
+    splitset.add(new Text("f"));
+    connector.tableOperations().addSplits(tableName, splitset);
+
+    List<IteratorSetting> list = new ArrayList<>();
+    list.add(new IteratorSetting(15, HardListIterator.class));
+    connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+    connector.tableOperations().cancelCompaction(tableName);
+    // depending on timing, compaction will finish or be canceled
+
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+    for (Map.Entry<Key,Value> entry : scanner)
+      actual.put(entry.getKey(), entry.getValue());
+    Assume.assumeFalse("Compaction successfully occurred due to weird timing but we hoped it would cancel.",
+        HardListIterator.allEntriesToInject.equals(actual));
+    assertTrue("Scan should be empty if compaction canceled. " + "Actual is " + actual, actual.isEmpty());
+    connector.tableOperations().delete(tableName);
+  }
+
+  @Test
+  public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+    Text splitRow = new Text("f");
+    SortedSet<Text> splitset = new TreeSet<>();
+    splitset.add(splitRow);
+    connector.tableOperations().addSplits(tableName, splitset);
+
+    List<IteratorSetting> list = new ArrayList<>();
+    list.add(new IteratorSetting(15, HardListIterator.class));
+    // compact the second tablet, not the first
+    connector.tableOperations().compact(tableName, splitRow, null, list, true, true);
+
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+    for (Map.Entry<Key,Value> entry : scanner)
+      actual.put(entry.getKey(), entry.getValue());
+    // only expect the entries in the second tablet
+    assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual);
+    connector.tableOperations().delete(tableName);
+  }
+
+  /** Test recovery from bad majc iterator via compaction cancel. */
+  @Test
+  public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+
+    List<IteratorSetting> list = new ArrayList<>();
+    list.add(new IteratorSetting(15, BadIterator.class));
+    connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+    UtilWaitThread.sleep(2000); // start compaction
+    connector.tableOperations().cancelCompaction(tableName);
+
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    Map<Key,Value> actual = new TreeMap<>();
+    for (Map.Entry<Key,Value> entry : scanner)
+      actual.put(entry.getKey(), entry.getValue());
+    assertTrue("Should be empty. Actual is " + actual, actual.isEmpty());
+    connector.tableOperations().delete(tableName);
+  }
+
 }