You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/04/15 01:22:18 UTC
accumulo git commit: ACCUMULO-3645 Run user major compactions with
iterators when tablets are empty. Removed/modified lines in Tablet that stop
construction of the SKVI stack when a tablet has no data and there are
iterators present and it is a user comp
Repository: accumulo
Updated Branches:
refs/heads/master 85d254e12 -> fad97f7a6
ACCUMULO-3645 Run user major compactions with iterators when tablets are empty.
Removed/modified lines in Tablet that stop construction of the SKVI stack
when a tablet has no data and there are iterators present and it is a user compaction.
Includes HardListIterator, an iterator that generates data for testing purposes.
Includes integration tests in accumulo-test.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fad97f7a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fad97f7a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fad97f7a
Branch: refs/heads/master
Commit: fad97f7a667dd34dcdce576fc0a1ac3514db434b
Parents: 85d254e
Author: Dylan Hutchison <dh...@mit.edu>
Authored: Tue Apr 14 16:49:20 2015 -0400
Committer: Dylan Hutchison <dh...@mit.edu>
Committed: Tue Apr 14 16:52:31 2015 -0400
----------------------------------------------------------------------
.../EverythingCompactionStrategy.java | 2 +-
.../apache/accumulo/tserver/tablet/Tablet.java | 63 ++++-----
.../apache/accumulo/test/HardListIterator.java | 115 +++++++++++++++++
.../apache/accumulo/test/TableOperationsIT.java | 127 +++++++++++++++++++
4 files changed, 277 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
index 9295c30..2710177 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
@@ -27,7 +27,7 @@ public class EverythingCompactionStrategy extends CompactionStrategy {
@Override
public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
- return request.getFiles().size() > 0;
+ return true; // ACCUMULO-3645 compact for empty files too
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 2342789..7c152b0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1863,21 +1863,26 @@ public class Tablet implements TabletCommitter {
if (inputFiles.isEmpty()) {
if (reason == MajorCompactionReason.USER) {
- // no work to do
- lastCompactID = compactionId.getFirst();
- updateCompactionID = true;
+ if (compactionId.getSecond().getIterators().isEmpty()) {
+ log.debug("No-op major compaction by USER on 0 input files because no iterators present.");
+ lastCompactID = compactionId.getFirst();
+ updateCompactionID = true;
+ } else {
+ log.debug("Major compaction by USER on 0 input files with iterators.");
+ filesToCompact = new HashMap<>();
+ }
} else {
return majCStats;
}
} else {
// If no original files will exist at the end of the compaction, we do not have to propogate deletes
- Set<FileRef> droppedFiles = new HashSet<FileRef>();
+ Set<FileRef> droppedFiles = new HashSet<>();
droppedFiles.addAll(inputFiles);
if (plan != null)
droppedFiles.addAll(plan.deleteFiles);
propogateDeletes = !(droppedFiles.equals(allFiles.keySet()));
log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes);
- filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles);
+ filesToCompact = new HashMap<>(allFiles);
filesToCompact.keySet().retainAll(inputFiles);
getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet());
@@ -1916,6 +1921,7 @@ public class Tablet implements TabletCommitter {
// compaction was canceled
return majCStats;
}
+ compactionIterators = compactionId.getSecond().getIterators();
synchronized (this) {
if (lastCompactID >= compactionId.getFirst())
@@ -1924,12 +1930,11 @@ public class Tablet implements TabletCommitter {
}
}
- compactionIterators = compactionId.getSecond().getIterators();
}
// need to handle case where only one file is being major compacted
- while (filesToCompact.size() > 0) {
-
+ // ACCUMULO-3645 run loop at least once, even if filesToCompact.isEmpty()
+ do {
int numToCompact = maxFilesToCompact;
if (filesToCompact.size() > maxFilesToCompact && filesToCompact.size() < 2 * maxFilesToCompact) {
@@ -1995,7 +2000,7 @@ public class Tablet implements TabletCommitter {
span.stop();
}
- }
+ } while (filesToCompact.size() > 0);
return majCStats;
} finally {
synchronized (Tablet.this) {
@@ -2025,6 +2030,13 @@ public class Tablet implements TabletCommitter {
private Set<FileRef> removeSmallest(Map<FileRef,DataFileValue> filesToCompact, int maxFilesToCompact) {
// ensure this method works properly when multiple files have the same size
+ // short-circuit; also handles zero files case
+ if (filesToCompact.size() <= maxFilesToCompact) {
+ Set<FileRef> smallestFiles = new HashSet<FileRef>(filesToCompact.keySet());
+ filesToCompact.clear();
+ return smallestFiles;
+ }
+
PriorityQueue<Pair<FileRef,Long>> fileHeap = new PriorityQueue<Pair<FileRef,Long>>(filesToCompact.size(), new Comparator<Pair<FileRef,Long>>() {
@Override
public int compare(Pair<FileRef,Long> o1, Pair<FileRef,Long> o2) {
@@ -2565,29 +2577,22 @@ public class Tablet implements TabletCommitter {
if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || isMajorCompactionRunning())
return;
- if (getDatafileManager().getDatafileSizes().size() == 0) {
- // no files, so jsut update the metadata table
- majorCompactionState = CompactionState.IN_PROGRESS;
- updateMetadata = true;
- lastCompactID = compactionId;
- } else {
- CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy();
- CompactionStrategy strategy = createCompactionStrategy(strategyConfig);
+ CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy();
+ CompactionStrategy strategy = createCompactionStrategy(strategyConfig);
- MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration);
- request.setFiles(getDatafileManager().getDatafileSizes());
+ MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration);
+ request.setFiles(getDatafileManager().getDatafileSizes());
- try {
- if (strategy.shouldCompact(request)) {
- initiateMajorCompaction(MajorCompactionReason.USER);
- } else {
- majorCompactionState = CompactionState.IN_PROGRESS;
- updateMetadata = true;
- lastCompactID = compactionId;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
+ try {
+ if (strategy.shouldCompact(request)) {
+ initiateMajorCompaction(MajorCompactionReason.USER);
+ } else {
+ majorCompactionState = CompactionState.IN_PROGRESS;
+ updateMetadata = true;
+ lastCompactID = compactionId;
}
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/test/src/main/java/org/apache/accumulo/test/HardListIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/HardListIterator.java b/test/src/main/java/org/apache/accumulo/test/HardListIterator.java
new file mode 100644
index 0000000..bd9cbf1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/HardListIterator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * A wrapper making a list of hardcoded data into a SKVI. For testing.
+ */
+public class HardListIterator implements SortedKeyValueIterator<Key,Value> {
+ private static final Logger log = Logger.getLogger(HardListIterator.class);
+ public final static SortedMap<Key,Value> allEntriesToInject;
+ static {
+ SortedMap<Key,Value> t = new TreeMap<>();
+ t.put(new Key(new Text("a1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes()));
+ t.put(new Key(new Text("c1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes()));
+ t.put(new Key(new Text("m1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes()));
+ allEntriesToInject = Collections.unmodifiableSortedMap(t); // for safety
+ }
+
+ private PeekingIterator<Map.Entry<Key,Value>> inner;
+ private Range seekRng;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ if (source != null)
+ log.info("HardListIterator ignores/replaces parent source passed in init(): " + source);
+
+ IteratorUtil.IteratorScope scope = env.getIteratorScope();
+ log.debug(this.getClass() + ": init on scope " + scope + (scope == IteratorUtil.IteratorScope.majc ? " fullScan=" + env.isFullMajorCompaction() : ""));
+
+ // define behavior before seek as seek to start at negative infinity
+ inner = new PeekingIterator<>(allEntriesToInject.entrySet().iterator());
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ HardListIterator newInstance;
+ try {
+ newInstance = HardListIterator.class.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ newInstance.inner = new PeekingIterator<>(allEntriesToInject.tailMap(inner.peek().getKey()).entrySet().iterator());
+
+ return newInstance;
+ }
+
+ @Override
+ public boolean hasTop() {
+ if (!inner.hasNext())
+ return false;
+ Key k = inner.peek().getKey();
+ return seekRng.contains(k); // do not return entries past the seek() range
+ }
+
+ @Override
+ public void next() throws IOException {
+ inner.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ seekRng = range;
+ // seek to first entry inside range
+ if (range.isInfiniteStartKey())
+ inner = new PeekingIterator<>(allEntriesToInject.entrySet().iterator());
+ else if (range.isStartKeyInclusive())
+ inner = new PeekingIterator<>(allEntriesToInject.tailMap(range.getStartKey()).entrySet().iterator());
+ else
+ inner = new PeekingIterator<>(allEntriesToInject.tailMap(range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)).entrySet()
+ .iterator());
+ }
+
+ @Override
+ public Key getTopKey() {
+ return hasTop() ? inner.peek().getKey() : null;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return hasTop() ? inner.peek().getValue() : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
index 083a77a..0127e6e 100644
--- a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -20,15 +20,19 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.accumulo.core.client.AccumuloException;
@@ -36,6 +40,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -45,14 +50,18 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.test.functional.BadIterator;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -229,4 +238,122 @@ public class TableOperationsIT extends AccumuloClusterIT {
return map;
}
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertEquals(HardListIterator.allEntriesToInject, actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ /** Compare only the row, column family and column qualifier. */
+ static class KeyRowColFColQComparator implements Comparator<Key> {
+ @Override
+ public int compare(Key k1, Key k2) {
+ return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL);
+ }
+ }
+
+ static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator();
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(new Text("f"));
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertEquals(HardListIterator.allEntriesToInject, actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(new Text("f"));
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+ connector.tableOperations().cancelCompaction(tableName);
+ // depending on timing, compaction will finish or be canceled
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ Assume.assumeFalse("Compaction successfully occurred due to weird timing but we hoped it would cancel.",
+ HardListIterator.allEntriesToInject.equals(actual));
+ assertTrue("Scan should be empty if compaction canceled. " + "Actual is " + actual, actual.isEmpty());
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ Text splitRow = new Text("f");
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(splitRow);
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ // compact the second tablet, not the first
+ connector.tableOperations().compact(tableName, splitRow, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ // only expect the entries in the second tablet
+ assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ /** Test recovery from bad majc iterator via compaction cancel. */
+ @Test
+ public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, BadIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+ UtilWaitThread.sleep(2000); // start compaction
+ connector.tableOperations().cancelCompaction(tableName);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>();
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertTrue("Should be empty. Actual is " + actual, actual.isEmpty());
+ connector.tableOperations().delete(tableName);
+ }
+
}