You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/02 09:06:17 UTC
[2/5] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f19dd4e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f19dd4e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f19dd4e
Branch: refs/heads/trunk
Commit: 9f19dd4e4e4f4adc948b36a3fd38077cbc691617
Parents: a320737 dbfeeac
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 08:53:33 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 08:53:33 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 3 +-
src/java/org/apache/cassandra/dht/Range.java | 44 +++++++++++++
.../org/apache/cassandra/dht/RangeTest.java | 66 ++++++++++++++++++++
4 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cf73f57,b0f9588..eaad3a2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,5 +1,24 @@@
-2.1.12
+2.2.4
+ * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
+ * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
+ * Reject index queries while the index is building (CASSANDRA-8505)
+ * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+ * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
+Merged from 2.1:
+ * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
* Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f3a69a6,2630ba2..65f93c0
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1192,90 -1092,68 +1192,91 @@@ public class CompactionManager implemen
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+ if (anticompactionGroup.originals().size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
+ return 0;
+ }
- File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
+
+ File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
+ CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
+ {
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable)));
- CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
- {
- int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)sstable.estimatedKeys());
- repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
- unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
+ metrics.beginCompaction(ci);
+ try
+ {
+ @SuppressWarnings("resource")
+ CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
++ Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges);
+ while (iter.hasNext())
{
- Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges);
- while (iter.hasNext())
+ @SuppressWarnings("resource")
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
++ if (containmentChecker.contains(row.key.getToken()))
{
- AbstractCompactedRow row = iter.next();
- if (containmentChecker.contains(row.key.getToken()))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
}
- finally
- {
- metrics.finishCompaction(ci);
- }
- anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
}
- catch (Throwable e)
+ finally
{
- JVMStabilityInspector.inspectThrowable(e);
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
+ metrics.finishCompaction(ci);
}
- }
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
- return anticompactedSSTables;
+ List<SSTableReader> anticompactedSSTables = new ArrayList<>();
+ // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
+ // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API
+ // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted.
+ anticompactionGroup.permitRedundantTransitions();
+ repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
+ unRepairedSSTableWriter.prepareToCommit();
+ anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+ repairedSSTableWriter.commit();
+ unRepairedSSTableWriter.commit();
+
+ logger.trace("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
+ repairedKeyCount + unrepairedKeyCount,
+ cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(),
+ anticompactionGroup);
+ return anticompactedSSTables.size();
+ }
+ catch (Throwable e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/Range.java
index cbf093c,81c92a2..9893531
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@@ -21,7 -21,10 +21,9 @@@ import java.io.Serializable
import java.util.*;
import org.apache.commons.lang3.ObjectUtils;
+
+ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Pair;
/**
@@@ -465,13 -472,67 +467,55 @@@ public class Range<T extends RingPositi
/**
* Compute a range of keys corresponding to a given range of token.
*/
- public static Range<RowPosition> makeRowRange(Token left, Token right, IPartitioner partitioner)
+ public static Range<RowPosition> makeRowRange(Token left, Token right)
{
- return new Range<RowPosition>(left.maxKeyBound(partitioner), right.maxKeyBound(partitioner), partitioner);
+ return new Range<RowPosition>(left.maxKeyBound(), right.maxKeyBound());
}
- @SuppressWarnings("unchecked")
- public AbstractBounds<RowPosition> toRowBounds()
+ public static Range<RowPosition> makeRowRange(Range<Token> tokenBounds)
{
- return (left instanceof Token) ? makeRowRange((Token)left, (Token)right, partitioner) : (Range<RowPosition>)this;
- }
-
- @SuppressWarnings("unchecked")
- public AbstractBounds<Token> toTokenBounds()
- {
- return (left instanceof RowPosition) ? new Range<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Range<Token>)this;
- }
-
- public AbstractBounds<T> withNewRight(T newRight)
- {
- return new Range<T>(left, newRight);
+ return makeRowRange(tokenBounds.left, tokenBounds.right);
}
+
+ /**
+ * Helper class to check if a token is contained within a given collection of ranges
+ */
+ public static class OrderedRangeContainmentChecker
+ {
+ private final Iterator<Range<Token>> normalizedRangesIterator;
+ private Token lastToken = null;
+ private Range<Token> currentRange;
+
+ public OrderedRangeContainmentChecker(Collection<Range<Token>> ranges)
+ {
+ normalizedRangesIterator = normalize(ranges).iterator();
+ assert normalizedRangesIterator.hasNext();
+ currentRange = normalizedRangesIterator.next();
+ }
+
+ /**
+ * Returns true if the ranges given in the constructor contains the token, false otherwise.
+ *
+ * The tokens passed to this method must be in increasing order
+ *
+ * @param t token to check, must be larger than or equal to the last token passed
+ * @return true if the token is contained within the ranges given to the constructor.
+ */
+ public boolean contains(Token t)
+ {
+ assert lastToken == null || lastToken.compareTo(t) <= 0;
+ lastToken = t;
+ while (true)
+ {
+ if (t.compareTo(currentRange.left) <= 0)
+ return false;
+ else if (t.compareTo(currentRange.right) <= 0 || currentRange.right.compareTo(currentRange.left) <= 0)
+ return true;
+
+ if (!normalizedRangesIterator.hasNext())
+ return false;
+ currentRange = normalizedRangesIterator.next();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/dht/RangeTest.java
index d93356a,1d8123b..85f2586
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@@ -27,11 -32,10 +32,12 @@@ import static java.util.Arrays.asList
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
-
import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
+
import static org.apache.cassandra.Util.range;
+ import static org.junit.Assert.*;
public class RangeTest
@@@ -540,4 -543,64 +546,64 @@@
expected = asList(range("", ""));
assertNormalize(input, expected);
}
+
+ @Test
+ public void testRandomOrderedRangeContainmentChecker()
+ {
+ Random r = new Random();
+ for (int j = 0; j < 1000; j++)
+ {
+ int numTokens = r.nextInt(300) + 1;
+ List<Range<Token>> ranges = new ArrayList<>(numTokens);
+ List<Token> tokens = new ArrayList<>(2 * numTokens);
+ for (int i = 0; i < 2 * numTokens; i++)
+ tokens.add(t(r.nextLong()));
+
+ Collections.sort(tokens);
+
+ for (int i = 0; i < tokens.size(); i++)
+ {
+ ranges.add(new Range<>(tokens.get(i), tokens.get(i + 1)));
+ i++;
+ }
+
+ List<Token> tokensToTest = new ArrayList<>();
+ for (int i = 0; i < 10000; i++)
+ tokensToTest.add(t(r.nextLong()));
+
+ tokensToTest.add(t(Long.MAX_VALUE));
+ tokensToTest.add(t(Long.MIN_VALUE));
+ tokensToTest.add(t(Long.MAX_VALUE - 1));
+ tokensToTest.add(t(Long.MIN_VALUE + 1));
+ Collections.sort(tokensToTest);
+
+ Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
+ for (Token t : tokensToTest)
+ {
+ if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration
+ fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t));
+ }
+ }
+ }
+
+ @Test
+ public void testBoundariesORCC()
+ {
+ List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE));
+ Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
+ assertFalse(checker.contains(t(Long.MIN_VALUE)));
+ assertTrue(checker.contains(t(Long.MIN_VALUE + 1)));
+ assertFalse(checker.contains(t(0)));
+ assertFalse(checker.contains(t(Long.MAX_VALUE - 1)));
+ assertTrue(checker.contains(t(Long.MAX_VALUE)));
+ }
+
+ private static Range<Token> r(long left, long right)
+ {
+ return new Range<>(t(left), t(right));
+ }
+ private static Token t(long t)
+ {
- return new LongToken(t);
++ return new Murmur3Partitioner.LongToken(t);
+ }
}