You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:24 UTC
[19/20] cassandra git commit: Merge commit
'78a3d2bba95b9efcda152a157f822f4970f22636' into cassandra-3.7
Merge commit '78a3d2bba95b9efcda152a157f822f4970f22636' into cassandra-3.7
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9ac0506
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9ac0506
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9ac0506
Branch: refs/heads/trunk
Commit: c9ac0506c41667ebe9b59acb73d6329401461289
Parents: 902877a 78a3d2b
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 12 15:19:52 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 12 15:20:41 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 283 ++++++++++++-------
.../org/apache/cassandra/db/Directories.java | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 70 +++--
.../db/commitlog/CommitLogReplayer.java | 59 ++--
.../cassandra/db/commitlog/ReplayPosition.java | 93 ++++--
.../compaction/AbstractCompactionStrategy.java | 3 -
.../compaction/CompactionStrategyManager.java | 3 -
.../apache/cassandra/db/lifecycle/Tracker.java | 53 +++-
.../org/apache/cassandra/db/lifecycle/View.java | 39 ++-
.../apache/cassandra/db/view/TableViews.java | 4 +-
.../io/sstable/format/SSTableReader.java | 5 -
.../cassandra/io/sstable/format/Version.java | 2 +
.../io/sstable/format/big/BigFormat.java | 12 +-
.../io/sstable/metadata/CompactionMetadata.java | 11 +-
.../metadata/LegacyMetadataSerializer.java | 13 +-
.../io/sstable/metadata/MetadataCollector.java | 36 ++-
.../io/sstable/metadata/StatsMetadata.java | 39 ++-
.../cassandra/tools/SSTableMetadataViewer.java | 3 +-
.../legacy_mb_clust/mb-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mb_clust/mb-1-big-Data.db | Bin 0 -> 5342 bytes
.../legacy_mb_clust/mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_clust/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_clust/mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../legacy_mb_clust/mb-1-big-Statistics.db | Bin 0 -> 7058 bytes
.../legacy_mb_clust/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_clust/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mb_clust_compact/mb-1-big-Data.db | Bin 0 -> 5383 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_clust_compact/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_clust_compact/mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 7058 bytes
.../legacy_mb_clust_compact/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_clust_compact/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../legacy_mb_clust_counter/mb-1-big-Data.db | Bin 0 -> 4625 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_clust_counter/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_clust_counter/mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 7067 bytes
.../legacy_mb_clust_counter/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_clust_counter/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../mb-1-big-Data.db | Bin 0 -> 4639 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 7067 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mb_simple/mb-1-big-Data.db | Bin 0 -> 91 bytes
.../legacy_mb_simple/mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_simple/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_simple/mb-1-big-Index.db | Bin 0 -> 26 bytes
.../legacy_mb_simple/mb-1-big-Statistics.db | Bin 0 -> 4611 bytes
.../legacy_mb_simple/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_simple/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mb_simple_compact/mb-1-big-Data.db | Bin 0 -> 91 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_simple_compact/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_simple_compact/mb-1-big-Index.db | Bin 0 -> 26 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 4652 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_simple_compact/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mb_simple_counter/mb-1-big-Data.db | Bin 0 -> 115 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_simple_counter/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_simple_counter/mb-1-big-Index.db | Bin 0 -> 27 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 4620 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_simple_counter/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../mb-1-big-Data.db | Bin 0 -> 114 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../mb-1-big-Index.db | Bin 0 -> 27 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 4661 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../mb-1-big-TOC.txt | 8 +
.../db/commitlog/CommitLogStressTest.java | 2 +-
.../cassandra/db/RecoveryManagerTest.java | 2 +-
.../cassandra/db/commitlog/CommitLogTest.java | 4 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
.../cassandra/db/lifecycle/TrackerTest.java | 30 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 2 +-
.../CompressedRandomAccessReaderTest.java | 4 +-
.../CompressedSequentialWriterTest.java | 2 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../metadata/MetadataSerializerTest.java | 83 +++++-
.../cassandra/io/util/MmappedRegionsTest.java | 3 +-
94 files changed, 644 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d138389,677ea11..fb8d272
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -3,78 -2,9 +3,79 @@@ Merged from 3.0
* Refactor Materialized View code (CASSANDRA-11475)
* Update Java Driver (CASSANDRA-11615)
Merged from 2.2:
++ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
* Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
-3.0.6
+3.6
+ * Allow server startup if JMX is configured directly (CASSANDRA-11725)
+ * Prevent direct memory OOM on buffer pool allocations (CASSANDRA-11710)
+ * Enhanced Compaction Logging (CASSANDRA-10805)
+ * Make prepared statement cache size configurable (CASSANDRA-11555)
+ * Integrated JMX authentication and authorization (CASSANDRA-10091)
+ * Add units to stress ouput (CASSANDRA-11352)
+ * Fix PER PARTITION LIMIT for single and multi partitions queries (CASSANDRA-11603)
+ * Add uncompressed chunk cache for RandomAccessReader (CASSANDRA-5863)
+ * Clarify ClusteringPrefix hierarchy (CASSANDRA-11213)
+ * Always perform collision check before joining ring (CASSANDRA-10134)
+ * SSTableWriter output discrepancy (CASSANDRA-11646)
+ * Fix potential timeout in NativeTransportService.testConcurrentDestroys (CASSANDRA-10756)
+ * Support large partitions on the 3.0 sstable format (CASSANDRA-11206)
+ * Add support to rebuild from specific range (CASSANDRA-10406)
+ * Optimize the overlapping lookup by calculating all the
+ bounds in advance (CASSANDRA-11571)
+ * Support json/yaml output in noetool tablestats (CASSANDRA-5977)
+ * (stress) Add datacenter option to -node options (CASSANDRA-11591)
+ * Fix handling of empty slices (CASSANDRA-11513)
+ * Make number of cores used by cqlsh COPY visible to testing code (CASSANDRA-11437)
+ * Allow filtering on clustering columns for queries without secondary indexes (CASSANDRA-11310)
+ * Refactor Restriction hierarchy (CASSANDRA-11354)
+ * Eliminate allocations in R/W path (CASSANDRA-11421)
+ * Update Netty to 4.0.36 (CASSANDRA-11567)
+ * Fix PER PARTITION LIMIT for queries requiring post-query ordering (CASSANDRA-11556)
+ * Allow instantiation of UDTs and tuples in UDFs (CASSANDRA-10818)
+ * Support UDT in CQLSSTableWriter (CASSANDRA-10624)
+ * Support for non-frozen user-defined types, updating
+ individual fields of user-defined types (CASSANDRA-7423)
+ * Make LZ4 compression level configurable (CASSANDRA-11051)
+ * Allow per-partition LIMIT clause in CQL (CASSANDRA-7017)
+ * Make custom filtering more extensible with UserExpression (CASSANDRA-11295)
+ * Improve field-checking and error reporting in cassandra.yaml (CASSANDRA-10649)
+ * Print CAS stats in nodetool proxyhistograms (CASSANDRA-11507)
+ * More user friendly error when providing an invalid token to nodetool (CASSANDRA-9348)
+ * Add static column support to SASI index (CASSANDRA-11183)
+ * Support EQ/PREFIX queries in SASI CONTAINS mode without tokenization (CASSANDRA-11434)
+ * Support LIKE operator in prepared statements (CASSANDRA-11456)
+ * Add a command to see if a Materialized View has finished building (CASSANDRA-9967)
+ * Log endpoint and port associated with streaming operation (CASSANDRA-8777)
+ * Print sensible units for all log messages (CASSANDRA-9692)
+ * Upgrade Netty to version 4.0.34 (CASSANDRA-11096)
+ * Break the CQL grammar into separate Parser and Lexer (CASSANDRA-11372)
+ * Compress only inter-dc traffic by default (CASSANDRA-8888)
+ * Add metrics to track write amplification (CASSANDRA-11420)
+ * cassandra-stress: cannot handle "value-less" tables (CASSANDRA-7739)
+ * Add/drop multiple columns in one ALTER TABLE statement (CASSANDRA-10411)
+ * Add require_endpoint_verification opt for internode encryption (CASSANDRA-9220)
+ * Add auto import java.util for UDF code block (CASSANDRA-11392)
+ * Add --hex-format option to nodetool getsstables (CASSANDRA-11337)
+ * sstablemetadata should print sstable min/max token (CASSANDRA-7159)
+ * Do not wrap CassandraException in TriggerExecutor (CASSANDRA-9421)
+ * COPY TO should have higher double precision (CASSANDRA-11255)
+ * Stress should exit with non-zero status after failure (CASSANDRA-10340)
+ * Add client to cqlsh SHOW_SESSION (CASSANDRA-8958)
+ * Fix nodetool tablestats keyspace level metrics (CASSANDRA-11226)
+ * Store repair options in parent_repair_history (CASSANDRA-11244)
+ * Print current leveling in sstableofflinerelevel (CASSANDRA-9588)
+ * Change repair message for keyspaces with RF 1 (CASSANDRA-11203)
+ * Remove hard-coded SSL cipher suites and protocols (CASSANDRA-10508)
+ * Improve concurrency in CompactionStrategyManager (CASSANDRA-10099)
+ * (cqlsh) interpret CQL type for formatting blobs (CASSANDRA-11274)
+ * Refuse to start and print txn log information in case of disk
+ corruption (CASSANDRA-10112)
+ * Resolve some eclipse-warnings (CASSANDRA-11086)
+ * (cqlsh) Show static columns in a different color (CASSANDRA-11059)
+ * Allow to remove TTLs on table with default_time_to_live (CASSANDRA-11207)
+Merged from 3.0:
* Disallow creating view with a static column (CASSANDRA-11602)
* Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593)
* Potential error replaying commitlog with smallint/tinyint/date/time types (CASSANDRA-11618)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index fffd87f,e9a2938..8dedd23
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -920,10 -903,13 +931,13 @@@ public class ColumnFamilyStore implemen
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- final ReplayPosition lastReplayPosition;
- volatile FSWriteError flushFailure = null;
+ volatile Throwable flushFailure = null;
+ final ReplayPosition commitLogUpperBound;
+ final List<Memtable> memtables;
+ final List<Collection<SSTableReader>> readers;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
+ private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
+ List<Memtable> memtables, List<Collection<SSTableReader>> readers)
{
this.writeBarrier = writeBarrier;
this.flushSecondaryIndexes = flushSecondaryIndexes;
@@@ -966,7 -962,9 +990,9 @@@
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ throw Throwables.propagate(flushFailure);
+
+ return commitLogUpperBound;
}
}
@@@ -1052,7 -1041,8 +1069,7 @@@
memtable.cfs.data.markFlushing(memtable);
if (memtable.isClean() || truncate)
{
- memtable.cfs.replaceFlushed(memtable, null);
+ memtable.cfs.data.replaceFlushed(memtable, Collections.emptyList());
- memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, Collections.emptyList());
reclaim(memtable);
iter.remove();
}
@@@ -1070,108 -1054,23 +1081,109 @@@
{
for (Memtable memtable : memtables)
{
- flushMemtable(memtable);
- Collection<SSTableReader> readers = memtable.flush();
- memtable.cfs.data.replaceFlushed(memtable, readers);
- reclaim(memtable);
- this.readers.add(readers);
++ this.readers.add(flushMemtable(memtable));
}
}
- catch (FSWriteError e)
+ catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(e);
- // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
- postFlush.flushFailure = e;
+ JVMStabilityInspector.inspectThrowable(t);
+ postFlush.flushFailure = t;
}
-
// signal the post-flush we've done our work
postFlush.latch.countDown();
}
- public void flushMemtable(Memtable memtable)
++ public Collection<SSTableReader> flushMemtable(Memtable memtable)
+ {
+ List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+ long totalBytesOnDisk = 0;
+ long maxBytesOnDisk = 0;
+ long minBytesOnDisk = Long.MAX_VALUE;
+ List<SSTableReader> sstables = new ArrayList<>();
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
+ {
+ List<Memtable.FlushRunnable> flushRunnables = null;
+ List<SSTableMultiWriter> flushResults = null;
+
+ try
+ {
+ // flush the memtable
+ flushRunnables = memtable.flushRunnables(txn);
+
+ for (int i = 0; i < flushRunnables.size(); i++)
+ futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
+ flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
+ }
+ catch (Throwable t)
+ {
+ t = memtable.abortRunnables(flushRunnables, t);
+ t = txn.abort(t);
+ throw Throwables.propagate(t);
+ }
+
+ try
+ {
+ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+ while (writerIterator.hasNext())
+ {
+ @SuppressWarnings("resource")
+ SSTableMultiWriter writer = writerIterator.next();
+ if (writer.getFilePointer() > 0)
+ {
+ writer.setOpenResult(true).prepareToCommit();
+ }
+ else
+ {
+ maybeFail(writer.abort(null));
+ writerIterator.remove();
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ for (SSTableMultiWriter writer : flushResults)
+ t = writer.abort(t);
+ t = txn.abort(t);
+ Throwables.propagate(t);
+ }
+
+ txn.prepareToCommit();
+
+ Throwable accumulate = null;
+ for (SSTableMultiWriter writer : flushResults)
+ accumulate = writer.commit(accumulate);
+
+ maybeFail(txn.commit(accumulate));
+
+ for (SSTableMultiWriter writer : flushResults)
+ {
+ Collection<SSTableReader> flushedSSTables = writer.finished();
+ for (SSTableReader sstable : flushedSSTables)
+ {
+ if (sstable != null)
+ {
+ sstables.add(sstable);
+ long size = sstable.bytesOnDisk();
+ totalBytesOnDisk += size;
+ maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+ minBytesOnDisk = Math.min(minBytesOnDisk, size);
+ }
+ }
+ }
+ }
- memtable.cfs.replaceFlushed(memtable, sstables);
++ memtable.cfs.data.replaceFlushed(memtable, sstables);
+ reclaim(memtable);
+ memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
+ logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
+ sstables,
+ sstables.size(),
+ FBUtilities.prettyPrintMemory(totalBytesOnDisk),
+ FBUtilities.prettyPrintMemory(maxBytesOnDisk),
+ FBUtilities.prettyPrintMemory(minBytesOnDisk));
++ return sstables;
+ }
+
private void reclaim(final Memtable memtable)
{
// issue a read barrier for reclaiming the memory, and offload the wait to another thread
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 6fe89a9,93dc5af..27c5372
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -34,7 -34,7 +34,6 @@@ import org.apache.cassandra.config.Colu
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
--import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -44,19 -44,16 +43,15 @@@ import org.apache.cassandra.db.rows.Unf
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.index.transactions.UpdateTransaction;
- import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
- import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.OpOrder;
- import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtablePool;
@@@ -257,49 -263,6 +261,48 @@@ public class Memtable implements Compar
return partitions.size();
}
+ public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
+ {
+ List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
+
+ if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
- return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(), txn));
++ return Collections.singletonList(new FlushRunnable(txn));
+
+ return createFlushRunnables(localRanges, txn);
+ }
+
+ private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+ {
+ assert cfs.getPartitioner().splitter().isPresent();
+
+ Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+ List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+ List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
+ PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
- ReplayPosition context = lastReplayPosition.get();
+ try
+ {
+ for (int i = 0; i < boundaries.size(); i++)
+ {
+ PartitionPosition t = boundaries.get(i);
- runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
++ runnables.add(new FlushRunnable(rangeStart, t, locations[i], txn));
+ rangeStart = t;
+ }
+ return runnables;
+ }
+ catch (Throwable e)
+ {
+ throw Throwables.propagate(abortRunnables(runnables, e));
+ }
+ }
+
+ public Throwable abortRunnables(List<FlushRunnable> runnables, Throwable t)
+ {
+ if (runnables != null)
+ for (FlushRunnable runnable : runnables)
+ t = runnable.writer.abort(t);
+ return t;
+ }
+
public String toString()
{
return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
@@@ -352,75 -315,46 +355,68 @@@
return partitions.get(key);
}
- public long creationTime()
- public Collection<SSTableReader> flush()
-- {
- return creationTime;
- long estimatedSize = estimatedSize();
- Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
- if (dataDirectory == null)
- throw new RuntimeException("Insufficient disk space to write " + estimatedSize + " bytes");
- File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
- assert sstableDirectory != null : "Flush task is not bound to any disk";
- return writeSortedContents(sstableDirectory);
-- }
--
public long getMinTimestamp()
{
return minTimestamp;
}
- private long estimatedSize()
+ class FlushRunnable implements Callable<SSTableMultiWriter>
{
- public final ReplayPosition context;
- long keySize = 0;
- for (PartitionPosition key : partitions.keySet())
+ private final long estimatedSize;
+ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
+
+ private final boolean isBatchLogTable;
+ private final SSTableMultiWriter writer;
+
+ // keeping these to be able to log what we are actually flushing
+ private final PartitionPosition from;
+ private final PartitionPosition to;
+
- FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
++ FlushRunnable(PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
{
- this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
- // make sure we don't write non-sensical keys
- assert key instanceof DecoratedKey;
- keySize += ((DecoratedKey)key).getKey().remaining();
++ this(partitions.subMap(from, to), flushLocation, from, to, txn);
}
- return (long) ((keySize // index entries
- + keySize // keys in data file
- + liveDataSize.get()) // data
- * 1.2); // bloom filter and row index overhead
- }
- FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
- private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
- {
- boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
++ FlushRunnable(LifecycleTransaction txn)
+ {
- this(context, partitions, null, null, null, txn);
++ this(partitions, null, null, null, txn);
+ }
+
- FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
++ FlushRunnable(ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+ {
- this.context = context;
+ this.toFlush = toFlush;
+ this.from = from;
+ this.to = to;
+ long keySize = 0;
+ for (PartitionPosition key : toFlush.keySet())
+ {
+ // make sure we don't write non-sensical keys
+ assert key instanceof DecoratedKey;
+ keySize += ((DecoratedKey) key).getKey().remaining();
+ }
+ estimatedSize = (long) ((keySize // index entries
+ + keySize // keys in data file
+ + liveDataSize.get()) // data
+ * 1.2); // bloom filter and row index overhead
+
+ this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
- logger.debug("Writing {}", Memtable.this.toString());
+ if (flushLocation == null)
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get());
+ else
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
+
+ }
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ protected Directories getDirectories()
{
+ return cfs.getDirectories();
+ }
+
- private void writeSortedContents(ReplayPosition context)
++ private void writeSortedContents()
+ {
+ logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
+
boolean trackContention = logger.isTraceEnabled();
int heavilyContendedRowCount = 0;
// (we can't clear out the map as-we-go to free up memory,
@@@ -447,37 -381,59 +443,39 @@@
}
}
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
-
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
- writer.abort();
- ssTables = Collections.emptyList();
- }
+ long bytesFlushed = writer.getFilePointer();
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(bytesFlushed),
- context));
++ commitLogUpperBound));
+ // Update the metrics
+ cfs.metric.bytesFlushed.inc(bytesFlushed);
if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
-
- return ssTables;
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
}
- }
- @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
- public SSTableTxnWriter createFlushWriter(String filename,
- PartitionColumns columns,
- EncodingStats stats)
- {
- // we operate "offline" here, as we expose the resulting reader consciously when done
- // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
- LifecycleTransaction txn = null;
- try
+ public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
+ String filename,
+ PartitionColumns columns,
+ EncodingStats stats)
{
- MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
- txn = LifecycleTransaction.offline(OperationType.FLUSH);
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
- .commitLogLowerBound(commitLogLowerBound.get())
- .commitLogUpperBound(commitLogUpperBound.get());
-
- return new SSTableTxnWriter(txn,
- cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
- (long) partitions.size(),
- ActiveRepairService.UNREPAIRED_SSTABLE,
- sstableMetadataCollector,
- new SerializationHeader(true, cfs.metadata, columns, stats),
- txn));
++ .commitLogLowerBound(commitLogLowerBound.get())
++ .commitLogUpperBound(commitLogUpperBound.get());
+ return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+ (long)toFlush.size(),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ sstableMetadataCollector,
+ new SerializationHeader(true, cfs.metadata, columns, stats), txn);
+
}
- catch (Throwable t)
+
+ @Override
+ public SSTableMultiWriter call()
{
- writeSortedContents(context);
- if (txn != null)
- txn.close();
- throw t;
++ writeSortedContents();
+ return writer;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index e00a0a4,f45a47a..93c6c50
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -23,17 -23,8 +23,16 @@@ import java.io.EOFException
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
- import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@@@ -45,27 -35,23 +44,25 @@@ import com.google.common.base.Throwable
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
- import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Uninterruptibles;
-
import org.apache.commons.lang3.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
- import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.FileSegmentInputStream;
-import org.apache.cassandra.io.util.RebufferingInputStream;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
@@@ -89,10 -71,10 +86,10 @@@ public class CommitLogReplaye
private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
private final Set<Keyspace> keyspacesRecovered;
- private final List<Future<?>> futures;
+ private final Queue<Future<Integer>> futures;
private final Map<UUID, AtomicInteger> invalidMutations;
private final AtomicInteger replayedCount;
- private final Map<UUID, ReplayPosition> cfPositions;
+ private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
private final ReplayPosition globalPosition;
private final CRC32 checksum;
private byte[] buffer;
@@@ -101,79 -83,12 +98,77 @@@
private final ReplayFilter replayFilter;
private final CommitLogArchiver archiver;
+ /*
+ * Wrapper around initiating mutations read from the log to make it possible
+ * to spy on initiated mutations for test
+ */
+ @VisibleForTesting
+ public static class MutationInitiator
+ {
+ protected Future<Integer> initiateMutation(final Mutation mutation,
+ final long segmentId,
+ final int serializedSize,
- final long entryLocation,
++ final int entryLocation,
+ final CommitLogReplayer clr)
+ {
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow()
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (clr.pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (PartitionUpdate update : clr.replayFilter.filter(mutation))
+ {
+ if (Schema.instance.getCF(update.metadata().cfId) == null)
+ continue; // dropped
+
- ReplayPosition rp = clr.cfPositions.get(update.metadata().cfId);
-
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
- if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
++ if (clr.shouldReplay(update.metadata().cfId, new ReplayPosition(segmentId, entryLocation)))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(update);
+ clr.replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+
+ try
+ {
+ Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
+ }
+ catch (ExecutionException e)
+ {
+ throw Throwables.propagate(e.getCause());
+ }
+
+ clr.keyspacesRecovered.add(keyspace);
+ }
+ }
+ };
+ return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
+ }
+ }
+
- CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
+ CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
{
this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
- this.futures = new ArrayList<Future<?>>();
+ this.futures = new ArrayDeque<Future<Integer>>();
this.buffer = new byte[4096];
- this.uncompressedBuffer = new byte[4096];
this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
@@@ -342,11 -285,22 +334,24 @@@
}
}
+ /**
+ * consult the known-persisted ranges for our sstables;
+ * if the position is covered by one of them it does not need to be replayed
+ *
+ * @return true iff replay is necessary
+ */
+ private boolean shouldReplay(UUID cfId, ReplayPosition position)
+ {
+ ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
+ return filter == null || filter.shouldReplay(position);
+ }
+
+ @SuppressWarnings("resource")
public void recover(File file, boolean tolerateTruncation) throws IOException
{
+ // just transform from the file name (no reading of headers) to determine version
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
try(ChannelProxy channel = new ChannelProxy(file);
RandomAccessReader reader = RandomAccessReader.open(channel))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index dd07b19,16090a1..be1436c
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -329,10 -332,10 +334,10 @@@ public class Tracke
apply(View.markFlushing(memtable));
}
- public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
+ public void replaceFlushed(Memtable memtable, Iterable<SSTableReader> sstables)
{
assert !isDummy();
- if (sstables == null || Iterables.isEmpty(sstables))
- if (sstables.isEmpty())
++ if (Iterables.isEmpty(sstables))
{
// sstable may be null if we flushed batchlog and nothing needed to be retained
// if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@@ -348,14 -351,7 +353,9 @@@
Throwable fail;
fail = updateSizeTracking(emptySet(), sstables, null);
- // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
- fail = notifyAdded(sstables, fail);
+ notifyDiscarded(memtable);
+
- if (!isDummy() && !cfstore.isValid())
- dropSSTables();
-
maybeFail(fail);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index 63926ed,17062b4..cde6363
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -319,12 -321,14 +321,14 @@@ public class Vie
List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
- if (flushed == null)
- if (flushed == null || flushed.isEmpty())
++ if (flushed == null || Iterables.isEmpty(flushed))
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
- view.compactingMap, view.intervalTree);
+ view.compactingMap, view.premature, view.intervalTree);
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
- return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
+ Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
+ Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
+ return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index e030b5b,e0fb3b1..9b6f491
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -120,8 -121,10 +120,10 @@@ public class BigFormat implements SSTab
// switch uncompressed checksums to adler32
// tracks presense of legacy (local and remote) counter shards
// la (2.2.0): new file name format
+ // lb (2.2.7): commit log lower bound included
// ma (3.0.0): swap bf hash order
// store rows natively
- // mb (3.0.6): commit log lower bound included
++ // mb (3.0.7, 3.7): commit log lower bound included
//
// NOTE: when adding a new version, please add that to LegacySSTableTest, too.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index c574a48,53cf0b0..ca50a44
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@@ -20,10 -20,14 +20,11 @@@ package org.apache.cassandra.io.sstable
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import com.google.common.collect.Maps;
+ import com.google.common.collect.Ordering;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index d2e0513,420b802..3b3d7e1
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -116,11 -70,9 +116,12 @@@ public class SSTableMetadataViewe
out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
out.printf("SSTable Level: %d%n", stats.sstableLevel);
out.printf("Repaired at: %d%n", stats.repairedAt);
- out.println(stats.replayPosition);
+ out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
+ out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
+ out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet);
+ out.printf("totalRows: %s%n", stats.totalRows);
out.println("Estimated tombstone drop times:");
+
for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
{
out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index e6f9499,31dea3e..8e45eea
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -461,13 -448,11 +461,13 @@@ public class CommitLogStressTes
super(log, discardedPos, null, ReplayFilter.create());
}
- int hash = 0;
- int cells = 0;
+ int hash;
+ int cells;
+ int discarded;
+ int skipped;
@Override
- void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
{
if (desc.id < discardedPos.segment)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 788757c,baf9466..5ac53f6
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -56,65 -50,6 +56,65 @@@ import org.apache.cassandra.db.commitlo
public class RecoveryManagerTest
{
private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
+ static final Semaphore blocker = new Semaphore(0);
+ static final Semaphore blocked = new Semaphore(0);
+ static CommitLogReplayer.MutationInitiator originalInitiator = null;
+ static final CommitLogReplayer.MutationInitiator mockInitiator = new CommitLogReplayer.MutationInitiator()
+ {
+ @Override
+ protected Future<Integer> initiateMutation(final Mutation mutation,
+ final long segmentId,
+ final int serializedSize,
- final long entryLocation,
++ final int entryLocation,
+ final CommitLogReplayer clr)
+ {
+ final Future<Integer> toWrap = super.initiateMutation(mutation,
+ segmentId,
+ serializedSize,
+ entryLocation,
+ clr);
+ return new Future<Integer>()
+ {
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return blocker.availablePermits() > 0 && toWrap.isDone();
+ }
+
+ @Override
+ public Integer get() throws InterruptedException, ExecutionException
+ {
+ System.out.println("Got blocker once");
+ blocked.release();
+ blocker.acquire();
+ return toWrap.get();
+ }
+
+ @Override
+ public Integer get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
+ {
+ blocked.release();
+ blocker.tryAcquire(1, timeout, unit);
+ return toWrap.get(timeout, unit);
+ }
+
+ };
+ }
+ };
private static final String KEYSPACE1 = "RecoveryManagerTest1";
private static final String CF_STANDARD1 = "Standard1";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index b5cbf8b,555cdda..1ea0eb1
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -584,147 -555,21 +584,147 @@@ public class CommitLogTes
}
@Test
- public void testDescriptorInvalidParametersSize() throws IOException
+ public void replay_Compressed_Deflate() throws IOException
{
- Map<String, String> params = new HashMap<>();
- for (int i=0; i<65535; ++i)
- params.put("key"+i, Integer.toString(i, 16));
- try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
- 21,
- new ParameterizedClass("LZ4Compressor", params));
- ByteBuffer buf = ByteBuffer.allocate(1024000);
- CommitLogDescriptor.writeHeader(buf, desc);
- Assert.fail("Parameter object too long should fail on writing descriptor.");
- } catch (ConfigurationException e)
+ replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()));
+ }
+
+ private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
+ {
+ ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+ EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
+ try
+ {
+ DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
+ DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
+ CommitLog.instance.resetUnsafe(true);
+
+ replaySimple(CommitLog.instance);
+ replayWithDiscard(CommitLog.instance);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitLogCompression(originalCompression);
+ DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+ CommitLog.instance.resetUnsafe(true);
+ }
+ }
+
+ @Test
+ public void replay_Encrypted() throws IOException
+ {
+ ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+ EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
+ try
+ {
+ DatabaseDescriptor.setCommitLogCompression(null);
+ DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
+ CommitLog.instance.resetUnsafe(true);
+
+ replaySimple(CommitLog.instance);
+ replayWithDiscard(CommitLog.instance);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitLogCompression(originalCompression);
+ DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+ CommitLog.instance.resetUnsafe(true);
+ }
+ }
+
+ private void replaySimple(CommitLog commitLog) throws IOException
+ {
+ int cellCount = 0;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ commitLog.add(rm1);
+
+ final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ commitLog.add(rm2);
+
+ commitLog.sync(true);
+
+ Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE);
+ List<String> activeSegments = commitLog.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.recover(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ private void replayWithDiscard(CommitLog commitLog) throws IOException
+ {
+ int cellCount = 0;
+ int max = 1024;
+ int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
+ ReplayPosition replayPosition = null;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ for (int i = 0; i < max; i++)
+ {
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ ReplayPosition position = commitLog.add(rm1);
+
+ if (i == discardPosition)
+ replayPosition = position;
+ if (i > discardPosition)
+ {
+ cellCount += 1;
+ }
+ }
+
+ commitLog.sync(true);
+
+ Replayer replayer = new Replayer(commitLog, replayPosition);
+ List<String> activeSegments = commitLog.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.recover(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ class Replayer extends CommitLogReplayer
+ {
+ private final ReplayPosition filterPosition;
+ int cells;
+ int skipped;
+
+ Replayer(CommitLog commitLog, ReplayPosition filterPosition)
+ {
- super(commitLog, filterPosition, null, ReplayFilter.create());
++ super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
+ this.filterPosition = filterPosition;
+ }
+
+ @SuppressWarnings("resource")
- void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) throws IOException
++ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException
{
- // correct path
+ if (entryLocation <= filterPosition.position)
+ {
+ // Skip over this mutation.
+ skipped++;
+ return;
+ }
+
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
+ for (Row row : partitionUpdate)
+ cells += Iterables.size(row.cells());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 0294115,b8de711..902e17a
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@@ -298,11 -298,13 +299,14 @@@ public class TrackerTes
Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
- tracker.replaceFlushed(prev2, Collections.singleton(reader));
+ tracker.replaceFlushed(prev2, singleton(reader));
Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(1, tracker.getView().premature.size());
+ tracker.permitCompactionOfFlushed(singleton(reader));
+ Assert.assertEquals(0, tracker.getView().premature.size());
- Assert.assertEquals(1, listener.received.size());
- Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
+ Assert.assertEquals(2, listener.received.size());
- Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
- Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
++ Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable);
++ Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
listener.received.clear();
Assert.assertTrue(reader.isKeyCacheSetup());
Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@@ -316,17 -318,13 +320,17 @@@
tracker.markFlushing(prev1);
reader = MockSchema.sstable(0, 10, true, cfs);
cfs.invalidate(false);
- tracker.replaceFlushed(prev1, Collections.singleton(reader));
+ tracker.replaceFlushed(prev1, singleton(reader));
+ tracker.permitCompactionOfFlushed(Collections.singleton(reader));
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
- Assert.assertEquals(reader, (((SSTableDeletingNotification) listener.received.get(0)).deleting));
- Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
+ System.out.println(listener.received);
- Assert.assertEquals(5, listener.received.size());
++ Assert.assertEquals(4, listener.received.size());
+ Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable);
- Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
- Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(2)).memtable);
- Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification);
- Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size());
++ Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
++ Assert.assertTrue(listener.received.get(2) instanceof SSTableDeletingNotification);
++ Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(3)).removed.size());
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
----------------------------------------------------------------------