You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:49:10 UTC
[19/23] cassandra git commit: Merge commit
'904cb5d10e0de1a6ca89249be8c257ed38a80ef0' into cassandra-3.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index b1c706e,5a3d524..f464e08
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -353,35 -347,13 +349,16 @@@ public class Tracke
Throwable fail;
fail = updateSizeTracking(emptySet(), sstables, null);
+
+ notifyDiscarded(memtable);
+
- maybeFail(fail);
- }
-
- /**
- * permit compaction of the provided sstable; this translates to notifying compaction
- * strategies of its existence, and potentially submitting a background task
- */
- public void permitCompactionOfFlushed(Collection<SSTableReader> sstables)
- {
- if (sstables.isEmpty())
- return;
+ // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
+ fail = notifyAdded(sstables, fail);
- apply(View.permitCompactionOfFlushed(sstables));
-
- if (isDummy())
- return;
-
- if (cfstore.isValid())
- {
- notifyAdded(sstables);
- CompactionManager.instance.submitBackground(cfstore);
- }
- else
- {
+ if (!isDummy() && !cfstore.isValid())
dropSSTables();
- }
+
+ maybeFail(fail);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index a5c781d,4b3aae0..b26426d
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -40,7 -39,7 +39,6 @@@ import static com.google.common.collect
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
--import static com.google.common.collect.Iterables.transform;
import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
import static org.apache.cassandra.db.lifecycle.Helpers.filterOut;
import static org.apache.cassandra.db.lifecycle.Helpers.replace;
@@@ -336,14 -333,12 +332,12 @@@ public class Vie
List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
- if (flushed == null || flushed.isEmpty())
+ if (flushed == null || Iterables.isEmpty(flushed))
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
- view.compactingMap, view.premature, view.intervalTree);
+ view.compactingMap, view.intervalTree);
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
- Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
- Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
- return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
+ return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 505de49,a683513..14e391b
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@@ -24,7 -24,8 +24,8 @@@ import java.util.*
import com.google.common.collect.Maps;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.Version;
@@@ -35,6 -36,8 +36,8 @@@ import org.apache.cassandra.utils.ByteB
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.StreamingHistogram;
-import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.replayPositionSetSerializer;
++import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.commitLogPositionSetSerializer;
+
/**
* Serializer for SSTable from legacy versions
*/
@@@ -55,7 -58,7 +58,7 @@@ public class LegacyMetadataSerializer e
EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
- CommitLogPosition.serializer.serialize(stats.commitLogUpperBound, out);
- ReplayPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
++ CommitLogPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out);
out.writeLong(stats.minTimestamp);
out.writeLong(stats.maxTimestamp);
out.writeInt(stats.maxLocalDeletionTime);
@@@ -72,7 -75,9 +75,9 @@@
for (ByteBuffer value : stats.maxClusteringValues)
ByteBufferUtil.writeWithShortLength(value, out);
if (version.hasCommitLogLowerBound())
- CommitLogPosition.serializer.serialize(stats.commitLogLowerBound, out);
- ReplayPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
++ CommitLogPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
+ if (version.hasCommitLogIntervals())
- replayPositionSetSerializer.serialize(stats.commitLogIntervals, out);
++ commitLogPositionSetSerializer.serialize(stats.commitLogIntervals, out);
}
/**
@@@ -120,7 -125,12 +125,12 @@@
maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
if (descriptor.version.hasCommitLogLowerBound())
- commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
- IntervalSet<ReplayPosition> commitLogIntervals;
+ commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
++ IntervalSet<CommitLogPosition> commitLogIntervals;
+ if (descriptor.version.hasCommitLogIntervals())
- commitLogIntervals = replayPositionSetSerializer.deserialize(in);
++ commitLogIntervals = commitLogPositionSetSerializer.deserialize(in);
+ else
+ commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound);
if (types.contains(MetadataType.VALIDATION))
components.put(MetadataType.VALIDATION,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 299bc87,1ff2ca8..196cfbf
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@@ -20,16 -20,18 +20,16 @@@ package org.apache.cassandra.io.sstable
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import com.google.common.collect.Maps;
- import com.google.common.collect.Ordering;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.db.rows.Cell;
@@@ -88,8 -89,7 +87,7 @@@ public class MetadataCollector implemen
protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
// TODO: cound the number of row per partition (either with the number of cells, or instead)
protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
- protected CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE;
- protected CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE;
- protected IntervalSet commitLogIntervals = IntervalSet.empty();
++ protected IntervalSet<CommitLogPosition> commitLogIntervals = IntervalSet.empty();
protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
@@@ -123,23 -123,13 +121,13 @@@
{
this(comparator);
- CommitLogPosition min = null, max = null;
- IntervalSet.Builder intervals = new IntervalSet.Builder();
++ IntervalSet.Builder<CommitLogPosition> intervals = new IntervalSet.Builder<>();
for (SSTableReader sstable : sstables)
{
- if (min == null)
- {
- min = sstable.getSSTableMetadata().commitLogLowerBound;
- max = sstable.getSSTableMetadata().commitLogUpperBound;
- }
- else
- {
- min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
- max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
- }
+ intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals);
}
- commitLogLowerBound(min);
- commitLogUpperBound(max);
+ commitLogIntervals(intervals.build());
sstableLevel(level);
}
@@@ -226,15 -216,9 +214,9 @@@
ttlTracker.update(newTTL);
}
- public MetadataCollector commitLogLowerBound(CommitLogPosition commitLogLowerBound)
- public MetadataCollector commitLogIntervals(IntervalSet commitLogIntervals)
++ public MetadataCollector commitLogIntervals(IntervalSet<CommitLogPosition> commitLogIntervals)
{
- this.commitLogLowerBound = commitLogLowerBound;
- return this;
- }
-
- public MetadataCollector commitLogUpperBound(CommitLogPosition commitLogUpperBound)
- {
- this.commitLogUpperBound = commitLogUpperBound;
+ this.commitLogIntervals = commitLogIntervals;
return this;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index e765235,9971eaa..c83c2cf
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@@ -26,7 -27,8 +27,8 @@@ import org.apache.cassandra.io.sstable.
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
@@@ -39,11 -41,11 +41,11 @@@ import org.apache.cassandra.utils.Strea
public class StatsMetadata extends MetadataComponent
{
public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
- public static final ISerializer<IntervalSet<ReplayPosition>> replayPositionSetSerializer = IntervalSet.serializer(ReplayPosition.serializer);
++ public static final ISerializer<IntervalSet<CommitLogPosition>> commitLogPositionSetSerializer = IntervalSet.serializer(CommitLogPosition.serializer);
public final EstimatedHistogram estimatedPartitionSize;
public final EstimatedHistogram estimatedColumnCount;
- public final CommitLogPosition commitLogLowerBound;
- public final CommitLogPosition commitLogUpperBound;
- public final IntervalSet<ReplayPosition> commitLogIntervals;
++ public final IntervalSet<CommitLogPosition> commitLogIntervals;
public final long minTimestamp;
public final long maxTimestamp;
public final int minLocalDeletionTime;
@@@ -62,8 -64,7 +64,7 @@@
public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
EstimatedHistogram estimatedColumnCount,
- CommitLogPosition commitLogLowerBound,
- CommitLogPosition commitLogUpperBound,
- IntervalSet<ReplayPosition> commitLogIntervals,
++ IntervalSet<CommitLogPosition> commitLogIntervals,
long minTimestamp,
long maxTimestamp,
int minLocalDeletionTime,
@@@ -239,7 -235,7 +235,7 @@@
int size = 0;
size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
- size += CommitLogPosition.serializer.serializedSize(component.commitLogUpperBound);
- size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE));
++ size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE));
if (version.storeRows())
size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
else
@@@ -258,7 -254,9 +254,9 @@@
if (version.storeRows())
size += 8 + 8; // totalColumnsSet, totalRows
if (version.hasCommitLogLowerBound())
- size += CommitLogPosition.serializer.serializedSize(component.commitLogLowerBound);
- size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE));
++ size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE));
+ if (version.hasCommitLogIntervals())
- size += replayPositionSetSerializer.serializedSize(component.commitLogIntervals);
++ size += commitLogPositionSetSerializer.serializedSize(component.commitLogIntervals);
return size;
}
@@@ -266,7 -264,7 +264,7 @@@
{
EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
- CommitLogPosition.serializer.serialize(component.commitLogUpperBound, out);
- ReplayPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
++ CommitLogPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out);
out.writeLong(component.minTimestamp);
out.writeLong(component.maxTimestamp);
if (version.storeRows())
@@@ -296,7 -294,9 +294,9 @@@
}
if (version.hasCommitLogLowerBound())
- CommitLogPosition.serializer.serialize(component.commitLogLowerBound, out);
- ReplayPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
++ CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
+ if (version.hasCommitLogIntervals())
- replayPositionSetSerializer.serialize(component.commitLogIntervals, out);
++ commitLogPositionSetSerializer.serialize(component.commitLogIntervals, out);
}
public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@@ -337,7 -337,12 +337,12 @@@
long totalRows = version.storeRows() ? in.readLong() : -1L;
if (version.hasCommitLogLowerBound())
- commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
- IntervalSet<ReplayPosition> commitLogIntervals;
+ commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
++ IntervalSet<CommitLogPosition> commitLogIntervals;
+ if (version.hasCommitLogIntervals())
- commitLogIntervals = replayPositionSetSerializer.deserialize(in);
++ commitLogIntervals = commitLogPositionSetSerializer.deserialize(in);
+ else
- commitLogIntervals = new IntervalSet<ReplayPosition>(commitLogLowerBound, commitLogUpperBound);
++ commitLogIntervals = new IntervalSet<CommitLogPosition>(commitLogLowerBound, commitLogUpperBound);
return new StatsMetadata(partitionSizes,
columnCounts,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 3c8ba64,5f7513f..6686684
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -112,15 -70,11 +112,14 @@@ public class SSTableMetadataViewe
out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
out.printf("SSTable Level: %d%n", stats.sstableLevel);
out.printf("Repaired at: %d%n", stats.repairedAt);
- out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
- out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
+ out.printf("Replay positions covered: %s\n", stats.commitLogIntervals);
+ out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet);
+ out.printf("totalRows: %s%n", stats.totalRows);
out.println("Estimated tombstone drop times:");
- for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
+
+ for (Map.Entry<Number, long[]> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
{
- out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue());
+ out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()[0]);
}
printHistograms(stats, out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 239077e,02b26c7..2858597
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -250,9 -245,9 +250,10 @@@ public class CommitLogStressTes
}
verifySizes(commitLog);
- commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos);
+ commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
- ReplayPosition.NONE, discardedPos);
++ CommitLogPosition.NONE, discardedPos);
threads.clear();
+
System.out.format("Discarded at %s\n", discardedPos);
verifySizes(commitLog);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index cd709d5,d04ca9b..7bcee7a
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -677,9 -648,26 +679,26 @@@ public class Uti
}
private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs,
- ReadOrderGroup orderGroup)
+ ReadExecutionController controller)
{
- return queryStorage(cfs, orderGroup);
+ return queryStorage(cfs, controller);
}
}
+
+ public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)
+ {
+ try
+ {
+ for ( ; ; )
+ {
+ DataDirectory dir = cfs.getDirectories().getWriteableLocation(1);
+ BlacklistedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir));
+ }
+ }
+ catch (IOError e)
+ {
+ // Expected -- marked all directories as unwritable
+ }
+ return () -> BlacklistedDirectories.clearUnwritableUnsafe();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 23ec58b,9a0ddb8..6ab7d46
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -20,9 -20,13 +20,10 @@@ package org.apache.cassandra.db.commitl
import java.io.*;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+ import java.util.function.BiConsumer;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@@ -35,34 -40,27 +36,34 @@@ import org.junit.runners.Parameterized.
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.db.Mutation;
- import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.config.Config.DiskFailurePolicy;
+ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.DeflateCompressor;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.compress.SnappyCompressor;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
+import org.apache.cassandra.utils.Hex;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.vint.VIntCoding;
+import org.junit.After;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@@ -247,13 -228,13 +248,13 @@@ public class CommitLogTes
.build();
CommitLog.instance.add(m2);
- assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+ assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
- CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition());
- CommitLog.instance.discardCompletedSegments(cfid2, ReplayPosition.NONE, CommitLog.instance.getContext());
++ CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
- // Assert we still have both our segment
- assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+ // Assert we still have both our segments
+ assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
}
@Test
@@@ -278,9 -258,9 +279,9 @@@
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
CommitLog.instance.sync(true);
- CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition());
- CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
++ CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
- assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+ assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
// Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@@ -298,10 -279,10 +299,10 @@@
// didn't write anything on cf1 since last flush (and we flush cf2)
UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
- CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition());
- CommitLog.instance.discardCompletedSegments(cfid2, ReplayPosition.NONE, CommitLog.instance.getContext());
++ CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
// Assert we still have both our segment
- assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+ assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
}
private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
@@@ -545,13 -486,13 +546,13 @@@
for (int i = 0 ; i < 5 ; i++)
CommitLog.instance.add(m2);
- assertEquals(2, CommitLog.instance.activeSegments());
- ReplayPosition position = CommitLog.instance.getContext();
- for (Keyspace ks : Keyspace.system())
- for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
- CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, ReplayPosition.NONE, position);
- CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, ReplayPosition.NONE, position);
- assertEquals(1, CommitLog.instance.activeSegments());
+ assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
+ CommitLogPosition position = CommitLog.instance.getCurrentPosition();
+ for (Keyspace keyspace : Keyspace.system())
+ for (ColumnFamilyStore syscfs : keyspace.getColumnFamilyStores())
- CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
- CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
++ CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, CommitLogPosition.NONE, position);
++ CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, CommitLogPosition.NONE, position);
+ assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
}
finally
{
@@@ -589,108 -530,136 +590,240 @@@
}
@Test
+ public void replaySimple() throws IOException
+ {
+ int cellCount = 0;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ CommitLog.instance.add(rm1);
+
+ final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ CommitLog.instance.add(rm2);
+
+ CommitLog.instance.sync(true);
+
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
+ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.replayFiles(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ @Test
+ public void replayWithDiscard() throws IOException
+ {
+ int cellCount = 0;
+ int max = 1024;
+ int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
+ CommitLogPosition commitLogPosition = null;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ for (int i = 0; i < max; i++)
+ {
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ CommitLogPosition position = CommitLog.instance.add(rm1);
+
+ if (i == discardPosition)
+ commitLogPosition = position;
+ if (i > discardPosition)
+ {
+ cellCount += 1;
+ }
+ }
+
+ CommitLog.instance.sync(true);
+
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
+ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.replayFiles(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ class SimpleCountingReplayer extends CommitLogReplayer
+ {
+ private final CommitLogPosition filterPosition;
+ private final CFMetaData metadata;
+ int cells;
+ int skipped;
+
+ SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm)
+ {
+ super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
+ this.filterPosition = filterPosition;
+ this.metadata = cfm;
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
+ {
+ if (entryLocation <= filterPosition.position)
+ {
+ // Skip over this mutation.
+ skipped++;
+ return;
+ }
+ for (PartitionUpdate partitionUpdate : m.getPartitionUpdates())
+ {
+ // Only process mutations for the CF's we're testing against, since we can't deterministically predict
+ // whether or not system keyspaces will be mutated during a test.
+ if (partitionUpdate.metadata().cfName.equals(metadata.cfName))
+ {
+ for (Row row : partitionUpdate)
+ cells += Iterables.size(row.cells());
+ }
+ }
+ }
+ }
++
+ public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
+
+ for (int i = 0 ; i < 5 ; i++)
+ {
+ new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("c" + i).add("val", ByteBuffer.allocate(100))
+ .build()
+ .apply();
+
+ if (i == 2)
+ {
+ try (Closeable c = Util.markDirectoriesUnwriteable(cfs))
+ {
+ cfs.forceBlockingFlush();
+ }
+ catch (Throwable t)
+ {
+ // expected. Cause (after some wrappings) should be a write error
+ while (!(t instanceof FSWriteError))
+ t = t.getCause();
+ }
+ }
+ else
+ cfs.forceBlockingFlush();
+ }
+ }
+ finally
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+ }
+
+ CommitLog.instance.sync(true);
+ System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
+ // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
+ // If retries work subsequent flushes should clear up error and this should change to expect 0.
+ Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+ }
+
+ public void testOutOfOrderFlushRecovery(BiConsumer<ColumnFamilyStore, Memtable> flushAction, boolean performCompaction)
+ throws ExecutionException, InterruptedException, IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ for (int i = 0 ; i < 5 ; i++)
+ {
+ new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("c" + i).add("val", ByteBuffer.allocate(100))
+ .build()
+ .apply();
+
+ Memtable current = cfs.getTracker().getView().getCurrentMemtable();
+ if (i == 2)
+ current.makeUnflushable();
+
+ flushAction.accept(cfs, current);
+ }
+ if (performCompaction)
+ cfs.forceMajorCompaction();
+ // Make sure metadata saves and reads fine
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ reader.reloadSSTableMetadata();
+
+ CommitLog.instance.sync(true);
+ System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
+ // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
+ // persisted all data in the commit log. Because we know there was an error, there must be something left to
+ // replay.
+ Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+ }
+
+ BiConsumer<ColumnFamilyStore, Memtable> flush = (cfs, current) ->
+ {
+ try
+ {
+ cfs.forceBlockingFlush();
+ }
+ catch (Throwable t)
+ {
+ // expected after makeUnflushable. Cause (after some wrappings) should be a write error
+ while (!(t instanceof FSWriteError))
+ t = t.getCause();
+ // Wait for started flushes to complete.
+ cfs.switchMemtableIfCurrent(current);
+ }
+ };
+
+ BiConsumer<ColumnFamilyStore, Memtable> recycleSegments = (cfs, current) ->
+ {
+ // Move to new commit log segment and try to flush all data. Also delete segments that no longer contain
+ // flushed data.
+ // This does not stop on errors and should retain segments for which flushing failed.
+ CommitLog.instance.forceRecycleAllSegments();
+
+ // Wait for started flushes to complete.
+ cfs.switchMemtableIfCurrent(current);
+ };
+
+ @Test
+ public void testOutOfOrderFlushRecovery() throws ExecutionException, InterruptedException, IOException
+ {
+ testOutOfOrderFlushRecovery(flush, false);
+ }
+
+ @Test
+ public void testOutOfOrderLogDiscard() throws ExecutionException, InterruptedException, IOException
+ {
+ testOutOfOrderFlushRecovery(recycleSegments, false);
+ }
+
+ @Test
+ public void testOutOfOrderFlushRecoveryWithCompaction() throws ExecutionException, InterruptedException, IOException
+ {
+ testOutOfOrderFlushRecovery(flush, true);
+ }
+
+ @Test
+ public void testOutOfOrderLogDiscardWithCompaction() throws ExecutionException, InterruptedException, IOException
+ {
+ testOutOfOrderFlushRecovery(recycleSegments, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 1668ddc,479e4e2..84e3e05
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@@ -299,14 -298,10 +299,11 @@@ public class TrackerTes
Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
- tracker.replaceFlushed(prev2, Collections.singleton(reader));
+ tracker.replaceFlushed(prev2, singleton(reader));
Assert.assertEquals(1, tracker.getView().sstables.size());
- Assert.assertEquals(1, tracker.getView().premature.size());
- tracker.permitCompactionOfFlushed(singleton(reader));
- Assert.assertEquals(0, tracker.getView().premature.size());
- Assert.assertEquals(1, listener.received.size());
- Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
+ Assert.assertEquals(2, listener.received.size());
+ Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable);
+ Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
listener.received.clear();
Assert.assertTrue(reader.isKeyCacheSetup());
Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@@ -324,13 -319,10 +321,12 @@@
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
- System.out.println(listener.received);
- Assert.assertEquals(4, listener.received.size());
- Assert.assertEquals(3, listener.received.size());
- Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
- Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
- Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
++ Assert.assertEquals(5, listener.received.size());
+ Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable);
+ Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
- Assert.assertTrue(listener.received.get(2) instanceof SSTableDeletingNotification);
- Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(3)).removed.size());
++ Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(2)).added);
++ Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification);
++ Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size());
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index a3382c4,de12d57..4bd4489
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@@ -32,7 -32,8 +32,8 @@@ import org.apache.cassandra.SchemaLoade
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@@@ -85,8 -86,7 +86,7 @@@ public class MetadataSerializerTes
CFMetaData cfm = SchemaLoader.standardCFMD("ks1", "cf1");
MetadataCollector collector = new MetadataCollector(cfm.comparator)
- .commitLogLowerBound(cllb)
- .commitLogUpperBound(club);
- .commitLogIntervals(new IntervalSet(cllb, club));
++ .commitLogIntervals(new IntervalSet<>(cllb, club));
String partitioner = RandomPartitioner.class.getCanonicalName();
double bfFpChance = 0.1;