You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/24 14:40:36 UTC
cassandra git commit: Don't serialize CFMetaData in readResponse
Repository: cassandra
Updated Branches:
refs/heads/trunk 7a6c3272e -> 8a9796902
Don't serialize CFMetaData in readResponse
patch by slebresne; reviewed by JoshuaMcKenzie for CASSANDRA-9847
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a979690
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a979690
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a979690
Branch: refs/heads/trunk
Commit: 8a9796902b923c09694f1396ca4b7fbd9e3b504f
Parents: 7a6c327
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jul 20 10:14:11 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jul 24 14:38:42 2015 +0200
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Memtable.java | 5 ++
.../org/apache/cassandra/db/ReadResponse.java | 33 +++++++---
.../db/compaction/CompactionIterator.java | 8 ++-
.../compaction/LeveledCompactionStrategy.java | 6 ++
.../db/index/composites/CompositesSearcher.java | 8 ++-
.../cassandra/db/index/keys/KeysSearcher.java | 6 ++
.../partitions/ArrayBackedCachedPartition.java | 19 ++++--
.../db/partitions/PartitionUpdate.java | 15 +++--
.../SingletonUnfilteredPartitionIterator.java | 6 ++
.../partitions/UnfilteredPartitionIterator.java | 3 +
.../UnfilteredPartitionIterators.java | 68 +++++++++++++-------
.../WrappingUnfilteredPartitionIterator.java | 6 ++
.../rows/UnfilteredRowIteratorSerializer.java | 30 ++++-----
.../io/sstable/format/big/BigTableScanner.java | 37 +++++------
.../apache/cassandra/service/DataResolver.java | 6 +-
.../cassandra/service/DigestResolver.java | 6 +-
.../cassandra/service/DataResolverTest.java | 6 +-
17 files changed, 178 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index a2d51e1..cd5560e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -284,6 +284,11 @@ public class Memtable implements Comparable<Memtable>
return isForThrift;
}
+ public CFMetaData metadata()
+ {
+ return cfs.metadata;
+ }
+
public boolean hasNext()
{
return iter.hasNext();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index b9928dc..740423a 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -39,6 +40,15 @@ public abstract class ReadResponse
public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
+ // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
+ // hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough".
+ private final CFMetaData metadata;
+
+ protected ReadResponse(CFMetaData metadata)
+ {
+ this.metadata = metadata;
+ }
+
public static ReadResponse createDataResponse(UnfilteredPartitionIterator data)
{
return new DataResponse(data);
@@ -49,8 +59,8 @@ public abstract class ReadResponse
return new DigestResponse(makeDigest(data));
}
- public abstract UnfilteredPartitionIterator makeIterator();
- public abstract ByteBuffer digest();
+ public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata);
+ public abstract ByteBuffer digest(CFMetaData metadata);
public abstract boolean isDigestQuery();
protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator)
@@ -66,16 +76,17 @@ public abstract class ReadResponse
private DigestResponse(ByteBuffer digest)
{
+ super(null);
assert digest.hasRemaining();
this.digest = digest;
}
- public UnfilteredPartitionIterator makeIterator()
+ public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
{
throw new UnsupportedOperationException();
}
- public ByteBuffer digest()
+ public ByteBuffer digest(CFMetaData metadata)
{
return digest;
}
@@ -94,12 +105,14 @@ public abstract class ReadResponse
private DataResponse(ByteBuffer data)
{
+ super(null); // This is never call on the serialization side, where we actually care of the metadata.
this.data = data;
this.flag = SerializationHelper.Flag.FROM_REMOTE;
}
private DataResponse(UnfilteredPartitionIterator iter)
{
+ super(iter.metadata());
try (DataOutputBuffer buffer = new DataOutputBuffer())
{
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, buffer, MessagingService.current_version);
@@ -113,12 +126,12 @@ public abstract class ReadResponse
}
}
- public UnfilteredPartitionIterator makeIterator()
+ public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
{
try
{
DataInputPlus in = new DataInputBuffer(data, true);
- return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, flag);
+ return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, metadata, flag);
}
catch (IOException e)
{
@@ -127,9 +140,9 @@ public abstract class ReadResponse
}
}
- public ByteBuffer digest()
+ public ByteBuffer digest(CFMetaData metadata)
{
- try (UnfilteredPartitionIterator iterator = makeIterator())
+ try (UnfilteredPartitionIterator iterator = makeIterator(metadata))
{
return makeDigest(iterator);
}
@@ -152,7 +165,7 @@ public abstract class ReadResponse
}
boolean isDigest = response.isDigestQuery();
- ByteBufferUtil.writeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
+ ByteBufferUtil.writeWithShortLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
if (!isDigest)
{
// Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
@@ -189,7 +202,7 @@ public abstract class ReadResponse
}
boolean isDigest = response.isDigestQuery();
- long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER);
if (!isDigest)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 0149582..a1a9d25 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.UUID;
import java.util.List;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.index.SecondaryIndexManager;
@@ -93,7 +94,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
metrics.beginCompaction(this);
this.compacted = scanners.isEmpty()
- ? UnfilteredPartitionIterators.EMPTY
+ ? UnfilteredPartitionIterators.empty(controller.cfs.metadata)
: new PurgeIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller);
}
@@ -102,6 +103,11 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
return false;
}
+ public CFMetaData metadata()
+ {
+ return controller.cfs.metadata;
+ }
+
public CompactionInfo getCompactionInfo()
{
return new CompactionInfo(controller.cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index dcdfd7f..83f6aba 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -28,6 +28,7 @@ import com.google.common.primitives.Doubles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -341,6 +342,11 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
return false;
}
+ public CFMetaData metadata()
+ {
+ return sstables.get(0).metadata; // The ctor checks we have at least one sstable
+ }
+
protected UnfilteredRowIterator computeNext()
{
if (currentScanner == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index ce92164..d322faf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -23,6 +23,7 @@ import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
@@ -69,6 +70,11 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return command.isForThrift();
}
+ public CFMetaData metadata()
+ {
+ return command.metadata();
+ }
+
public boolean hasNext()
{
return prepareNext();
@@ -125,7 +131,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
// Query the gathered index hits. We still need to filter stale hits from the resulting query.
ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
- SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(baseCfs.metadata,
+ SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(metadata(),
command.nowInSec(),
command.columnFilter(),
command.rowFilter(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 118fb75..bcaf70b 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -23,6 +23,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
@@ -58,6 +59,11 @@ public class KeysSearcher extends SecondaryIndexSearcher
return command.isForThrift();
}
+ public CFMetaData metadata()
+ {
+ return command.metadata();
+ }
+
public boolean hasNext()
{
return prepareNext();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
index f39245b..e2ec06d 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -203,6 +203,8 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
{
public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException
{
+ int version = MessagingService.current_version;
+
assert partition instanceof ArrayBackedCachedPartition;
ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
@@ -211,14 +213,17 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
out.writeInt(p.rowsWithNonExpiringCells);
out.writeInt(p.nonTombstoneCellCount);
out.writeInt(p.nonExpiringLiveCells);
+ CFMetaData.serializer.serialize(partition.metadata(), out, version);
try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
{
- UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rowCount());
+ UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, p.rowCount());
}
}
public CachedPartition deserialize(DataInputPlus in) throws IOException
{
+ int version = MessagingService.current_version;
+
// Note that it would be slightly simpler to just do
// ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...));
// However deserializing the header separatly is not a lot harder and allows us to:
@@ -232,13 +237,14 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
int nonTombstoneCellCount = in.readInt();
int nonExpiringLiveCells = in.readInt();
- UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, SerializationHelper.Flag.LOCAL);
assert !header.isReversed && header.rowEstimate >= 0;
- MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, header.metadata.comparator, false);
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false);
List<Row> rows = new ArrayList<>(header.rowEstimate);
- try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL, header))
+ try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, SerializationHelper.Flag.LOCAL, header))
{
while (partition.hasNext())
{
@@ -250,7 +256,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
}
}
- return new ArrayBackedCachedPartition(header.metadata,
+ return new ArrayBackedCachedPartition(metadata,
header.key,
header.sHeader.columns(),
header.staticRow,
@@ -267,6 +273,8 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
public long serializedSize(CachedPartition partition)
{
+ int version = MessagingService.current_version;
+
assert partition instanceof ArrayBackedCachedPartition;
ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
@@ -277,6 +285,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
+ TypeSizes.sizeof(p.rowsWithNonExpiringCells)
+ TypeSizes.sizeof(p.nonTombstoneCellCount)
+ TypeSizes.sizeof(p.nonExpiringLiveCells)
+ + CFMetaData.serializer.serializedSize(partition.metadata(), version)
+ UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rowCount());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 366828a..c9788e6 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -687,6 +687,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
// assert count == written: "Table had " + count + " columns, but " + written + " written";
}
+ CFMetaData.serializer.serialize(update.metadata(), out, version);
try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
{
assert !iter.isReverseOrder();
@@ -717,17 +718,18 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
assert key == null; // key is only there for the old format
- UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag);
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
if (header.isEmpty)
- return emptyUpdate(header.metadata, header.key);
+ return emptyUpdate(metadata, header.key);
assert !header.isReversed;
assert header.rowEstimate >= 0;
- MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, header.metadata.comparator, false);
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false);
List<Row> rows = new ArrayList<>(header.rowEstimate);
- try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag, header))
+ try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag, header))
{
while (partition.hasNext())
{
@@ -739,7 +741,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
}
}
- return new PartitionUpdate(header.metadata,
+ return new PartitionUpdate(metadata,
header.key,
header.sHeader.columns(),
header.staticRow,
@@ -770,7 +772,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
{
- return UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size());
+ return CFMetaData.serializer.serializedSize(update.metadata(), version)
+ + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
index e2fec05..bfa6690 100644
--- a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions;
import java.util.NoSuchElementException;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
public class SingletonUnfilteredPartitionIterator implements UnfilteredPartitionIterator
@@ -38,6 +39,11 @@ public class SingletonUnfilteredPartitionIterator implements UnfilteredPartition
return isForThrift;
}
+ public CFMetaData metadata()
+ {
+ return iter.metadata();
+ }
+
public boolean hasNext()
{
return !returned;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
index 2447da8..10989df 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions;
import java.util.Iterator;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
/**
@@ -42,5 +43,7 @@ public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowItera
*/
public boolean isForThrift();
+ public CFMetaData metadata();
+
public void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 4414f44..dd625c4 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -39,24 +39,6 @@ public abstract class UnfilteredPartitionIterators
private static final Comparator<UnfilteredRowIterator> partitionComparator = (p1, p2) -> p1.partitionKey().compareTo(p2.partitionKey());
- public static final UnfilteredPartitionIterator EMPTY = new AbstractUnfilteredPartitionIterator()
- {
- public boolean isForThrift()
- {
- return false;
- }
-
- public boolean hasNext()
- {
- return false;
- }
-
- public UnfilteredRowIterator next()
- {
- throw new NoSuchElementException();
- }
- };
-
private UnfilteredPartitionIterators() {}
public interface MergeListener
@@ -65,6 +47,33 @@ public abstract class UnfilteredPartitionIterators
public void close();
}
+
+ public static UnfilteredPartitionIterator empty(final CFMetaData metadata)
+ {
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ public boolean isForThrift()
+ {
+ return false;
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+
@SuppressWarnings("resource") // The created resources are returned right away
public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command)
{
@@ -163,18 +172,17 @@ public abstract class UnfilteredPartitionIterators
assert !iterators.isEmpty();
final boolean isForThrift = iterators.get(0).isForThrift();
+ final CFMetaData metadata = iterators.get(0).metadata();
final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
{
private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size());
- private CFMetaData metadata;
private DecoratedKey partitionKey;
private boolean isReverseOrder;
public void reduce(int idx, UnfilteredRowIterator current)
{
- metadata = current.metadata();
partitionKey = current.partitionKey();
isReverseOrder = current.isReverseOrder();
@@ -210,6 +218,11 @@ public abstract class UnfilteredPartitionIterators
return isForThrift;
}
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
public boolean hasNext()
{
return merged.hasNext();
@@ -236,6 +249,7 @@ public abstract class UnfilteredPartitionIterators
return iterators.get(0);
final boolean isForThrift = iterators.get(0).isForThrift();
+ final CFMetaData metadata = iterators.get(0).metadata();
final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
{
@@ -276,6 +290,11 @@ public abstract class UnfilteredPartitionIterators
return isForThrift;
}
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
public boolean hasNext()
{
return merged.hasNext();
@@ -353,7 +372,7 @@ public abstract class UnfilteredPartitionIterators
out.writeBoolean(false);
}
- public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final SerializationHelper.Flag flag) throws IOException
+ public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException
{
if (version < MessagingService.VERSION_30)
throw new UnsupportedOperationException();
@@ -371,6 +390,11 @@ public abstract class UnfilteredPartitionIterators
return isForThrift;
}
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
public boolean hasNext()
{
if (!nextReturned)
@@ -401,7 +425,7 @@ public abstract class UnfilteredPartitionIterators
try
{
nextReturned = true;
- next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag);
+ next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag);
return next;
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
index 4f35075..ebf3c28 100644
--- a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.partitions;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
@@ -44,6 +45,11 @@ public abstract class WrappingUnfilteredPartitionIterator extends AbstractUnfilt
return wrapped.isForThrift();
}
+ public CFMetaData metadata()
+ {
+ return wrapped.metadata();
+ }
+
public boolean hasNext()
{
prepareNext();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 129ed50..c998964 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -89,7 +89,6 @@ public class UnfilteredRowIteratorSerializer
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
{
- CFMetaData.serializer.serialize(iterator.metadata(), out, version);
ByteBufferUtil.writeWithLength(iterator.partitionKey().getKey(), out);
int flags = 0;
@@ -141,8 +140,7 @@ public class UnfilteredRowIteratorSerializer
assert rowEstimate >= 0;
- long size = CFMetaData.serializer.serializedSize(iterator.metadata(), version)
- + TypeSizes.sizeofWithLength(iterator.partitionKey().getKey())
+ long size = TypeSizes.sizeofWithLength(iterator.partitionKey().getKey())
+ 1; // flags
if (iterator.isEmpty())
@@ -170,16 +168,15 @@ public class UnfilteredRowIteratorSerializer
return size;
}
- public Header deserializeHeader(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+ public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
{
- CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithLength(in));
int flags = in.readUnsignedByte();
boolean isReversed = (flags & IS_REVERSED) != 0;
if ((flags & IS_EMPTY) != 0)
{
SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, RowStats.NO_STATS);
- return new Header(sh, metadata, key, isReversed, true, null, null, 0);
+ return new Header(sh, key, isReversed, true, null, null, 0);
}
boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0;
@@ -195,17 +192,17 @@ public class UnfilteredRowIteratorSerializer
staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag));
int rowEstimate = hasRowEstimate ? (int)in.readVInt() : -1;
- return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
+ return new Header(header, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
}
- public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, Header header) throws IOException
+ public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws IOException
{
if (header.isEmpty)
- return UnfilteredRowIterators.emptyIterator(header.metadata, header.key, header.isReversed);
+ return UnfilteredRowIterators.emptyIterator(metadata, header.key, header.isReversed);
- final SerializationHelper helper = new SerializationHelper(header.metadata, version, flag);
+ final SerializationHelper helper = new SerializationHelper(metadata, version, flag);
final SerializationHeader sHeader = header.sHeader;
- return new AbstractUnfilteredRowIterator(header.metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
+ return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
{
private final Row.Builder builder = ArrayBackedRow.sortedBuilder(sHeader.columns().regulars);
@@ -224,9 +221,9 @@ public class UnfilteredRowIteratorSerializer
};
}
- public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+ public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
{
- return deserialize(in, version, flag, deserializeHeader(in, version, flag));
+ return deserialize(in, version, metadata, flag, deserializeHeader(in, version, metadata, flag));
}
public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException
@@ -257,7 +254,6 @@ public class UnfilteredRowIteratorSerializer
public static class Header
{
public final SerializationHeader sHeader;
- public final CFMetaData metadata;
public final DecoratedKey key;
public final boolean isReversed;
public final boolean isEmpty;
@@ -266,7 +262,6 @@ public class UnfilteredRowIteratorSerializer
public final int rowEstimate; // -1 if no estimate
private Header(SerializationHeader sHeader,
- CFMetaData metadata,
DecoratedKey key,
boolean isReversed,
boolean isEmpty,
@@ -275,7 +270,6 @@ public class UnfilteredRowIteratorSerializer
int rowEstimate)
{
this.sHeader = sHeader;
- this.metadata = metadata;
this.key = key;
this.isReversed = isReversed;
this.isEmpty = isEmpty;
@@ -287,8 +281,8 @@ public class UnfilteredRowIteratorSerializer
@Override
public String toString()
{
- return String.format("{header=%s, table=%s.%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
- sHeader, metadata.ksName, metadata.cfName, key, isReversed, isEmpty, partitionDeletion, staticRow.toString(metadata), rowEstimate);
+ return String.format("{header=%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
+ sHeader, key, isReversed, isEmpty, partitionDeletion, staticRow, rowEstimate);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 0794e90..0451a98 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -25,6 +25,7 @@ import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.*;
@@ -62,7 +63,7 @@ public class BigTableScanner implements ISSTableScanner
private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
private final boolean isForThrift;
- protected UnfilteredPartitionIterator iterator;
+ protected Iterator<UnfilteredRowIterator> iterator;
// Full scan of the sstables
public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
@@ -80,7 +81,7 @@ public class BigTableScanner implements ISSTableScanner
// We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(tokenRanges);
if (positions.isEmpty())
- return new EmptySSTableScanner(sstable.getFilename());
+ return new EmptySSTableScanner(sstable);
return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, makeBounds(sstable, tokenRanges).iterator());
}
@@ -227,6 +228,11 @@ public class BigTableScanner implements ISSTableScanner
return isForThrift;
}
+ public CFMetaData metadata()
+ {
+ return sstable.metadata;
+ }
+
public boolean hasNext()
{
if (iterator == null)
@@ -246,23 +252,18 @@ public class BigTableScanner implements ISSTableScanner
throw new UnsupportedOperationException();
}
- private UnfilteredPartitionIterator createIterator()
+ private Iterator<UnfilteredRowIterator> createIterator()
{
return new KeyScanningIterator();
}
- protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+ protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator>
{
private DecoratedKey nextKey;
private RowIndexEntry nextEntry;
private DecoratedKey currentKey;
private RowIndexEntry currentEntry;
- public boolean isForThrift()
- {
- return isForThrift;
- }
-
protected UnfilteredRowIterator computeNext()
{
try
@@ -345,11 +346,6 @@ public class BigTableScanner implements ISSTableScanner
throw new CorruptSSTableException(e, sstable.getFilename());
}
}
-
- public void close()
- {
- BigTableScanner.this.close();
- }
}
@Override
@@ -364,11 +360,11 @@ public class BigTableScanner implements ISSTableScanner
public static class EmptySSTableScanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
{
- private final String filename;
+ private final SSTableReader sstable;
- public EmptySSTableScanner(String filename)
+ public EmptySSTableScanner(SSTableReader sstable)
{
- this.filename = filename;
+ this.sstable = sstable;
}
public long getLengthInBytes()
@@ -383,7 +379,7 @@ public class BigTableScanner implements ISSTableScanner
public String getBackingFiles()
{
- return filename;
+ return sstable.getFilename();
}
public boolean isForThrift()
@@ -391,6 +387,11 @@ public class BigTableScanner implements ISSTableScanner
return false;
}
+ public CFMetaData metadata()
+ {
+ return sstable.metadata;
+ }
+
public boolean hasNext()
{
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index a9024e3..515a0d8 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver
public PartitionIterator getData()
{
ReadResponse response = responses.iterator().next().payload;
- return UnfilteredPartitionIterators.filter(response.makeIterator(), command.nowInSec());
+ return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata()), command.nowInSec());
}
public PartitionIterator resolve()
@@ -62,7 +62,7 @@ public class DataResolver extends ResponseResolver
for (int i = 0; i < count; i++)
{
MessageIn<ReadResponse> msg = responses.get(i);
- iters.add(msg.payload.makeIterator());
+ iters.add(msg.payload.makeIterator(command.metadata()));
sources[i] = msg.from;
}
@@ -411,7 +411,7 @@ public class DataResolver extends ResponseResolver
// We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
handler.awaitResults();
assert resolver.responses.size() == 1;
- return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(), retryCommand);
+ return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata()), retryCommand);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 12b0626..42aee04 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver
public PartitionIterator getData()
{
assert isDataPresent();
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec());
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec());
}
/*
@@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver
{
ReadResponse response = message.payload;
- ByteBuffer newDigest = response.digest();
+ ByteBuffer newDigest = response.digest(command.metadata());
if (digest == null)
digest = newDigest;
else if (!digest.equals(newDigest))
@@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver
if (logger.isDebugEnabled())
logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec());
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec());
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index dd57a96..efd3504 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -306,7 +306,7 @@ public class DataResolverTest
.add("c2", "v2")
.buildUpdate())));
InetAddress peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, UnfilteredPartitionIterators.EMPTY));
+ resolver.preprocess(readResponseMessage(peer2, UnfilteredPartitionIterators.empty(cfm)));
try(PartitionIterator data = resolver.resolve();
RowIterator rows = Iterators.getOnlyElement(data))
@@ -328,8 +328,8 @@ public class DataResolverTest
public void testResolveWithBothEmpty()
{
DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
- resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.EMPTY));
- resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.EMPTY));
+ resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm)));
+ resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm)));
try(PartitionIterator data = resolver.resolve())
{