You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:12 UTC
[07/20] cassandra git commit: Fix commit log replay after
out-of-order flush completion.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 30ed85b..ca7fe82 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileDataInput;
@@ -49,7 +50,7 @@ public class MetadataSerializer implements IMetadataSerializer
{
private static final Logger logger = LoggerFactory.getLogger(MetadataSerializer.class);
- public void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus out) throws IOException
+ public void serialize(Map<MetadataType, MetadataComponent> components, Version version, DataOutputPlus out) throws IOException
{
// sort components by type
List<MetadataComponent> sortedComponents = Lists.newArrayList(components.values());
@@ -66,12 +67,12 @@ public class MetadataSerializer implements IMetadataSerializer
out.writeInt(type.ordinal());
// serialize position
out.writeInt(lastPosition);
- lastPosition += type.serializer.serializedSize(component);
+ lastPosition += type.serializer.serializedSize(component, version);
}
// serialize components
for (MetadataComponent component : sortedComponents)
{
- component.getType().serializer.serialize(component, out);
+ component.getType().serializer.serialize(component, version, out);
}
}
@@ -153,7 +154,7 @@ public class MetadataSerializer implements IMetadataSerializer
try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
{
- serialize(currentComponents, out);
+ serialize(currentComponents, descriptor.version, out);
out.flush();
}
// we cant move a file on top of another file in windows:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index f2eb1af..3d48e34 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -43,7 +43,8 @@ public class StatsMetadata extends MetadataComponent
public final EstimatedHistogram estimatedRowSize;
public final EstimatedHistogram estimatedColumnCount;
- public final ReplayPosition replayPosition;
+ public final ReplayPosition commitLogLowerBound;
+ public final ReplayPosition commitLogUpperBound;
public final long minTimestamp;
public final long maxTimestamp;
public final int maxLocalDeletionTime;
@@ -57,7 +58,8 @@ public class StatsMetadata extends MetadataComponent
public StatsMetadata(EstimatedHistogram estimatedRowSize,
EstimatedHistogram estimatedColumnCount,
- ReplayPosition replayPosition,
+ ReplayPosition commitLogLowerBound,
+ ReplayPosition commitLogUpperBound,
long minTimestamp,
long maxTimestamp,
int maxLocalDeletionTime,
@@ -71,7 +73,8 @@ public class StatsMetadata extends MetadataComponent
{
this.estimatedRowSize = estimatedRowSize;
this.estimatedColumnCount = estimatedColumnCount;
- this.replayPosition = replayPosition;
+ this.commitLogLowerBound = commitLogLowerBound;
+ this.commitLogUpperBound = commitLogUpperBound;
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.maxLocalDeletionTime = maxLocalDeletionTime;
@@ -117,7 +120,8 @@ public class StatsMetadata extends MetadataComponent
{
return new StatsMetadata(estimatedRowSize,
estimatedColumnCount,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
maxLocalDeletionTime,
@@ -134,7 +138,8 @@ public class StatsMetadata extends MetadataComponent
{
return new StatsMetadata(estimatedRowSize,
estimatedColumnCount,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
maxLocalDeletionTime,
@@ -157,7 +162,8 @@ public class StatsMetadata extends MetadataComponent
return new EqualsBuilder()
.append(estimatedRowSize, that.estimatedRowSize)
.append(estimatedColumnCount, that.estimatedColumnCount)
- .append(replayPosition, that.replayPosition)
+ .append(commitLogLowerBound, that.commitLogLowerBound)
+ .append(commitLogUpperBound, that.commitLogUpperBound)
.append(minTimestamp, that.minTimestamp)
.append(maxTimestamp, that.maxTimestamp)
.append(maxLocalDeletionTime, that.maxLocalDeletionTime)
@@ -177,7 +183,8 @@ public class StatsMetadata extends MetadataComponent
return new HashCodeBuilder()
.append(estimatedRowSize)
.append(estimatedColumnCount)
- .append(replayPosition)
+ .append(commitLogLowerBound)
+ .append(commitLogUpperBound)
.append(minTimestamp)
.append(maxTimestamp)
.append(maxLocalDeletionTime)
@@ -193,12 +200,12 @@ public class StatsMetadata extends MetadataComponent
public static class StatsMetadataSerializer implements IMetadataComponentSerializer<StatsMetadata>
{
- public int serializedSize(StatsMetadata component) throws IOException
+ public int serializedSize(StatsMetadata component, Version version) throws IOException
{
int size = 0;
size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
- size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
+ size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound, TypeSizes.NATIVE);
size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
@@ -211,14 +218,16 @@ public class StatsMetadata extends MetadataComponent
for (ByteBuffer columnName : component.maxColumnNames)
size += 2 + columnName.remaining(); // with short length
size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
+ if (version.hasCommitLogLowerBound())
+ size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound, TypeSizes.NATIVE);
return size;
}
- public void serialize(StatsMetadata component, DataOutputPlus out) throws IOException
+ public void serialize(StatsMetadata component, Version version, DataOutputPlus out) throws IOException
{
EstimatedHistogram.serializer.serialize(component.estimatedRowSize, out);
EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(component.replayPosition, out);
+ ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
out.writeLong(component.minTimestamp);
out.writeLong(component.maxTimestamp);
out.writeInt(component.maxLocalDeletionTime);
@@ -233,13 +242,16 @@ public class StatsMetadata extends MetadataComponent
for (ByteBuffer columnName : component.maxColumnNames)
ByteBufferUtil.writeWithShortLength(columnName, out);
out.writeBoolean(component.hasLegacyCounterShards);
+ if (version.hasCommitLogLowerBound())
+ ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
}
public StatsMetadata deserialize(Version version, DataInput in) throws IOException
{
EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+ ReplayPosition commitLogLowerBound = ReplayPosition.NONE, commitLogUpperBound;
+ commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
int maxLocalDeletionTime = in.readInt();
@@ -264,9 +276,12 @@ public class StatsMetadata extends MetadataComponent
if (version.tracksLegacyCounterShards())
hasLegacyCounterShards = in.readBoolean();
+ if (version.hasCommitLogLowerBound())
+ commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
return new StatsMetadata(rowSizes,
columnCounts,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
maxLocalDeletionTime,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
index 603732b..4ca078b 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
@@ -71,12 +71,12 @@ public class ValidationMetadata extends MetadataComponent
public static class ValidationMetadataSerializer implements IMetadataComponentSerializer<ValidationMetadata>
{
- public int serializedSize(ValidationMetadata component) throws IOException
+ public int serializedSize(ValidationMetadata component, Version version) throws IOException
{
return TypeSizes.NATIVE.sizeof(component.partitioner) + 8;
}
- public void serialize(ValidationMetadata component, DataOutputPlus out) throws IOException
+ public void serialize(ValidationMetadata component, Version version, DataOutputPlus out) throws IOException
{
out.writeUTF(component.partitioner);
out.writeDouble(component.bloomFilterFPChance);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
deleted file mode 100644
index 1a15d6f..0000000
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public abstract class DiskAwareRunnable extends WrappedRunnable
-{
- protected Directories.DataDirectory getWriteDirectory(long writeSize)
- {
- Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
- if (directory == null)
- throw new FSWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), "");
-
- return directory;
- }
-
- /**
- * Get sstable directories for the CF.
- * @return Directories instance for the CF.
- */
- protected abstract Directories getDirectories();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index dd56b8b..6911ec6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -122,14 +122,11 @@ public class StreamReceiveTask extends StreamTask
for (SSTableWriter writer : task.sstables)
writer.abort();
task.sstables.clear();
- task.session.taskCompleted(task);
return;
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
- if (lockfiledir == null)
- throw new IOError(new IOException("All disks full"));
StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 2665f40..8319014 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -70,7 +70,8 @@ public class SSTableMetadataViewer
out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
out.printf("SSTable Level: %d%n", stats.sstableLevel);
out.printf("Repaired at: %d%n", stats.repairedAt);
- out.println(stats.replayPosition);
+ out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
+ out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
out.println("Estimated tombstone drop times:");
for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 1b4edee..f9b4156 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -452,7 +452,7 @@ public class CommitLogStressTest
int cells = 0;
@Override
- void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
{
if (desc.id < discardedPos.segment)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index c377a21..0c46061 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -59,7 +59,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
}
@Override
- void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
{
FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
Mutation mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 93b88b4..adeb778 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.MockSchema;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
@@ -263,15 +264,15 @@ public class TrackerTest
Tracker tracker = cfs.getTracker();
tracker.subscribe(listener);
- Memtable prev1 = tracker.switchMemtable(true);
+ Memtable prev1 = tracker.switchMemtable(true, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfs));
OpOrder.Group write1 = cfs.keyspace.writeOrder.getCurrent();
OpOrder.Barrier barrier1 = cfs.keyspace.writeOrder.newBarrier();
- prev1.setDiscarding(barrier1, new AtomicReference<ReplayPosition>());
+ prev1.setDiscarding(barrier1, new AtomicReference<>(CommitLog.instance.getContext()));
barrier1.issue();
- Memtable prev2 = tracker.switchMemtable(false);
+ Memtable prev2 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfs));
OpOrder.Group write2 = cfs.keyspace.writeOrder.getCurrent();
OpOrder.Barrier barrier2 = cfs.keyspace.writeOrder.newBarrier();
- prev2.setDiscarding(barrier2, new AtomicReference<ReplayPosition>());
+ prev2.setDiscarding(barrier2, new AtomicReference<>(CommitLog.instance.getContext()));
barrier2.issue();
Memtable cur = tracker.getView().getCurrentMemtable();
OpOrder.Group writecur = cfs.keyspace.writeOrder.getCurrent();
@@ -297,6 +298,9 @@ public class TrackerTest
SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
tracker.replaceFlushed(prev2, reader);
Assert.assertEquals(1, tracker.getView().sstables.size());
+ Assert.assertEquals(1, tracker.getView().premature.size());
+ tracker.permitCompactionOfFlushed(reader);
+ Assert.assertEquals(0, tracker.getView().premature.size());
Assert.assertEquals(1, listener.received.size());
Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
listener.received.clear();
@@ -307,16 +311,17 @@ public class TrackerTest
tracker = cfs.getTracker();
listener = new MockListener(false);
tracker.subscribe(listener);
- prev1 = tracker.switchMemtable(false);
+ prev1 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfs));
tracker.markFlushing(prev1);
reader = MockSchema.sstable(0, 10, true, cfs);
cfs.invalidate(false);
tracker.replaceFlushed(prev1, reader);
+ tracker.permitCompactionOfFlushed(reader);
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
- Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
- Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
+ Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(0)).removed.size());
+ Assert.assertEquals(reader, (((SSTableDeletingNotification) listener.received.get(1)).deleting));
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 32a81e2..5706598 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -208,6 +208,6 @@ public class ViewTest
for (int i = 0 ; i < sstableCount ; i++)
sstables.add(MockSchema.sstable(i, cfs));
return new View(ImmutableList.copyOf(memtables), Collections.<Memtable>emptyList(), Helpers.identityMap(sstables),
- Collections.<SSTableReader>emptySet(), SSTableIntervalTree.build(sstables));
+ Collections.<SSTableReader>emptySet(), Collections.<SSTableReader>emptySet(), SSTableIntervalTree.build(sstables));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 931422b..3bef89e 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -113,7 +113,7 @@ public class CompressedRandomAccessReaderTest
ChannelProxy channel = new ChannelProxy(f);
try
{
- MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
SequentialWriter writer = compressed
? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
: SequentialWriter.open(f);
@@ -166,7 +166,7 @@ public class CompressedRandomAccessReaderTest
File metadata = new File(file.getPath() + ".meta");
metadata.deleteOnExit();
- MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
{
writer.write(CONTENT.getBytes());
@@ -251,7 +251,7 @@ public class CompressedRandomAccessReaderTest
File metadata = new File(file.getPath() + ".meta");
metadata.deleteOnExit();
- MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
{
writer.write(CONTENT.getBytes());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 27b866d..43c44fd 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -85,7 +85,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
try
{
- MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
byte[] dataPre = new byte[bytesToTest];
byte[] rawPost = new byte[bytesToTest];
try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index eda4f17..19fa7c4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.sstable.metadata;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.EnumSet;
@@ -27,12 +28,16 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.RandomAccessReader;
@@ -45,18 +50,51 @@ public class MetadataSerializerTest
@Test
public void testSerialization() throws IOException
{
+ Map<MetadataType, MetadataComponent> originalMetadata = constructMetadata();
+
+ MetadataSerializer serializer = new MetadataSerializer();
+ File statsFile = serialize(originalMetadata, serializer, BigFormat.latestVersion);
+
+ Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
+ try (RandomAccessReader in = RandomAccessReader.open(statsFile))
+ {
+ Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));
+
+ for (MetadataType type : MetadataType.values())
+ {
+ assertEquals(originalMetadata.get(type), deserialized.get(type));
+ }
+ }
+ }
+
+ public File serialize(Map<MetadataType, MetadataComponent> metadata, MetadataSerializer serializer, Version version)
+ throws IOException, FileNotFoundException
+ {
+ // Serialize to tmp file
+ File statsFile = File.createTempFile(Component.STATS.name, null);
+ try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
+ {
+ serializer.serialize(metadata, version, out);
+ }
+ return statsFile;
+ }
+
+ public Map<MetadataType, MetadataComponent> constructMetadata()
+ {
EstimatedHistogram rowSizes = new EstimatedHistogram(new long[] { 1L, 2L },
new long[] { 3L, 4L, 5L });
EstimatedHistogram columnCounts = new EstimatedHistogram(new long[] { 6L, 7L },
new long[] { 8L, 9L, 10L });
- ReplayPosition rp = new ReplayPosition(11L, 12);
+ ReplayPosition start = new ReplayPosition(11L, 12);
+ ReplayPosition end = new ReplayPosition(15L, 9);
long minTimestamp = 2162517136L;
long maxTimestamp = 4162517136L;
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance))
.estimatedRowSize(rowSizes)
.estimatedColumnCount(columnCounts)
- .replayPosition(rp);
+ .commitLogLowerBound(start)
+ .commitLogUpperBound(end);
collector.updateMinTimestamp(minTimestamp);
collector.updateMaxTimestamp(maxTimestamp);
@@ -67,23 +105,35 @@ public class MetadataSerializerTest
String partitioner = RandomPartitioner.class.getCanonicalName();
double bfFpChance = 0.1;
Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance, 0);
+ return originalMetadata;
+ }
+
+ @Test
+ public void testLaReadsLb() throws IOException
+ {
+ Map<MetadataType, MetadataComponent> originalMetadata = constructMetadata();
MetadataSerializer serializer = new MetadataSerializer();
- // Serialize to tmp file
- File statsFile = File.createTempFile(Component.STATS.name, null);
- try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
- {
- serializer.serialize(originalMetadata, out);
- }
+ // Write metadata in two minor formats.
+ File statsFileLb = serialize(originalMetadata, serializer, BigFormat.instance.getVersion("lb"));
+ File statsFileLa = serialize(originalMetadata, serializer, BigFormat.instance.getVersion("la"));
- Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
- try (RandomAccessReader in = RandomAccessReader.open(statsFile))
+ // Reading both as earlier version should yield identical results.
+ Descriptor desc = new Descriptor("la", statsFileLb.getParentFile(), "", "", 0, Descriptor.Type.FINAL, DatabaseDescriptor.getSSTableFormat());
+ try (RandomAccessReader inLb = RandomAccessReader.open(statsFileLb);
+ RandomAccessReader inLa = RandomAccessReader.open(statsFileLa))
{
- Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));
+ Map<MetadataType, MetadataComponent> deserializedLb = serializer.deserialize(desc, inLb, EnumSet.allOf(MetadataType.class));
+ Map<MetadataType, MetadataComponent> deserializedLa = serializer.deserialize(desc, inLa, EnumSet.allOf(MetadataType.class));
for (MetadataType type : MetadataType.values())
{
- assertEquals(originalMetadata.get(type), deserialized.get(type));
+ assertEquals(deserializedLa.get(type), deserializedLb.get(type));
+ if (!originalMetadata.get(type).equals(deserializedLb.get(type)))
+ {
+ // Currently only STATS can be different. Change if no longer the case
+ assertEquals(MetadataType.STATS, type);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
index 8409a26..1f66fb7 100644
--- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
@@ -157,7 +157,7 @@ public class IntervalTreeTest
public String deserialize(DataInput in) throws IOException { return in.readUTF(); }
public long serializedSize(String v, TypeSizes s) { return v.length(); }
},
- (Constructor<Interval<Integer, String>>) (Object) Interval.class.getConstructor(Object.class, Object.class, Object.class)
+ (Constructor<Interval<Integer, String>>) (Constructor<?>) Interval.class.getConstructor(Object.class, Object.class, Object.class)
);
DataOutputBuffer out = new DataOutputBuffer();