You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/24 14:40:36 UTC

cassandra git commit: Don't serialize CFMetaData in readResponse

Repository: cassandra
Updated Branches:
  refs/heads/trunk 7a6c3272e -> 8a9796902


Don't serialize CFMetaData in readResponse

patch by slebresne; reviewed by JoshuaMcKenzie for CASSANDRA-9847


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a979690
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a979690
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a979690

Branch: refs/heads/trunk
Commit: 8a9796902b923c09694f1396ca4b7fbd9e3b504f
Parents: 7a6c327
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jul 20 10:14:11 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jul 24 14:38:42 2015 +0200

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Memtable.java  |  5 ++
 .../org/apache/cassandra/db/ReadResponse.java   | 33 +++++++---
 .../db/compaction/CompactionIterator.java       |  8 ++-
 .../compaction/LeveledCompactionStrategy.java   |  6 ++
 .../db/index/composites/CompositesSearcher.java |  8 ++-
 .../cassandra/db/index/keys/KeysSearcher.java   |  6 ++
 .../partitions/ArrayBackedCachedPartition.java  | 19 ++++--
 .../db/partitions/PartitionUpdate.java          | 15 +++--
 .../SingletonUnfilteredPartitionIterator.java   |  6 ++
 .../partitions/UnfilteredPartitionIterator.java |  3 +
 .../UnfilteredPartitionIterators.java           | 68 +++++++++++++-------
 .../WrappingUnfilteredPartitionIterator.java    |  6 ++
 .../rows/UnfilteredRowIteratorSerializer.java   | 30 ++++-----
 .../io/sstable/format/big/BigTableScanner.java  | 37 +++++------
 .../apache/cassandra/service/DataResolver.java  |  6 +-
 .../cassandra/service/DigestResolver.java       |  6 +-
 .../cassandra/service/DataResolverTest.java     |  6 +-
 17 files changed, 178 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index a2d51e1..cd5560e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -284,6 +284,11 @@ public class Memtable implements Comparable<Memtable>
                 return isForThrift;
             }
 
+            public CFMetaData metadata()
+            {
+                return cfs.metadata;
+            }
+
             public boolean hasNext()
             {
                 return iter.hasNext();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index b9928dc..740423a 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -39,6 +40,15 @@ public abstract class ReadResponse
     public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
     public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
 
+    // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
+    // hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough".
+    private final CFMetaData metadata;
+
+    protected ReadResponse(CFMetaData metadata)
+    {
+        this.metadata = metadata;
+    }
+
     public static ReadResponse createDataResponse(UnfilteredPartitionIterator data)
     {
         return new DataResponse(data);
@@ -49,8 +59,8 @@ public abstract class ReadResponse
         return new DigestResponse(makeDigest(data));
     }
 
-    public abstract UnfilteredPartitionIterator makeIterator();
-    public abstract ByteBuffer digest();
+    public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata);
+    public abstract ByteBuffer digest(CFMetaData metadata);
     public abstract boolean isDigestQuery();
 
     protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator)
@@ -66,16 +76,17 @@ public abstract class ReadResponse
 
         private DigestResponse(ByteBuffer digest)
         {
+            super(null);
             assert digest.hasRemaining();
             this.digest = digest;
         }
 
-        public UnfilteredPartitionIterator makeIterator()
+        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
         {
             throw new UnsupportedOperationException();
         }
 
-        public ByteBuffer digest()
+        public ByteBuffer digest(CFMetaData metadata)
         {
             return digest;
         }
@@ -94,12 +105,14 @@ public abstract class ReadResponse
 
         private DataResponse(ByteBuffer data)
         {
+            super(null); // This is never call on the serialization side, where we actually care of the metadata.
             this.data = data;
             this.flag = SerializationHelper.Flag.FROM_REMOTE;
         }
 
         private DataResponse(UnfilteredPartitionIterator iter)
         {
+            super(iter.metadata());
             try (DataOutputBuffer buffer = new DataOutputBuffer())
             {
                 UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, buffer, MessagingService.current_version);
@@ -113,12 +126,12 @@ public abstract class ReadResponse
             }
         }
 
-        public UnfilteredPartitionIterator makeIterator()
+        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
         {
             try
             {
                 DataInputPlus in = new DataInputBuffer(data, true);
-                return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, flag);
+                return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, metadata, flag);
             }
             catch (IOException e)
             {
@@ -127,9 +140,9 @@ public abstract class ReadResponse
             }
         }
 
-        public ByteBuffer digest()
+        public ByteBuffer digest(CFMetaData metadata)
         {
-            try (UnfilteredPartitionIterator iterator = makeIterator())
+            try (UnfilteredPartitionIterator iterator = makeIterator(metadata))
             {
                 return makeDigest(iterator);
             }
@@ -152,7 +165,7 @@ public abstract class ReadResponse
             }
 
             boolean isDigest = response.isDigestQuery();
-            ByteBufferUtil.writeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
+            ByteBufferUtil.writeWithShortLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
             if (!isDigest)
             {
                 // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
@@ -189,7 +202,7 @@ public abstract class ReadResponse
             }
 
             boolean isDigest = response.isDigestQuery();
-            long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER);
+            long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER);
 
             if (!isDigest)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 0149582..a1a9d25 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.UUID;
 import java.util.List;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
@@ -93,7 +94,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
             metrics.beginCompaction(this);
 
         this.compacted = scanners.isEmpty()
-                       ? UnfilteredPartitionIterators.EMPTY
+                       ? UnfilteredPartitionIterators.empty(controller.cfs.metadata)
                        : new PurgeIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller);
     }
 
@@ -102,6 +103,11 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
         return false;
     }
 
+    public CFMetaData metadata()
+    {
+        return controller.cfs.metadata;
+    }
+
     public CompactionInfo getCompactionInfo()
     {
         return new CompactionInfo(controller.cfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index dcdfd7f..83f6aba 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -28,6 +28,7 @@ import com.google.common.primitives.Doubles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -341,6 +342,11 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             return false;
         }
 
+        public CFMetaData metadata()
+        {
+            return sstables.get(0).metadata; // The ctor checks we have at least one sstable
+        }
+
         protected UnfilteredRowIterator computeNext()
         {
             if (currentScanner == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index ce92164..d322faf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -23,6 +23,7 @@ import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -69,6 +70,11 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                 return command.isForThrift();
             }
 
+            public CFMetaData metadata()
+            {
+                return command.metadata();
+            }
+
             public boolean hasNext()
             {
                 return prepareNext();
@@ -125,7 +131,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                 // Query the gathered index hits. We still need to filter stale hits from the resulting query.
                 ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
-                SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(baseCfs.metadata,
+                SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(metadata(),
                                                                                      command.nowInSec(),
                                                                                      command.columnFilter(),
                                                                                      command.rowFilter(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 118fb75..bcaf70b 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -58,6 +59,11 @@ public class KeysSearcher extends SecondaryIndexSearcher
                 return command.isForThrift();
             }
 
+            public CFMetaData metadata()
+            {
+                return command.metadata();
+            }
+
             public boolean hasNext()
             {
                 return prepareNext();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
index f39245b..e2ec06d 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -203,6 +203,8 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
     {
         public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException
         {
+            int version = MessagingService.current_version;
+
             assert partition instanceof ArrayBackedCachedPartition;
             ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
 
@@ -211,14 +213,17 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
             out.writeInt(p.rowsWithNonExpiringCells);
             out.writeInt(p.nonTombstoneCellCount);
             out.writeInt(p.nonExpiringLiveCells);
+            CFMetaData.serializer.serialize(partition.metadata(), out, version);
             try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
             {
-                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rowCount());
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, p.rowCount());
             }
         }
 
         public CachedPartition deserialize(DataInputPlus in) throws IOException
         {
+            int version = MessagingService.current_version;
+
             // Note that it would be slightly simpler to just do
             //   ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...));
             // However deserializing the header separatly is not a lot harder and allows us to:
@@ -232,13 +237,14 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
             int nonTombstoneCellCount = in.readInt();
             int nonExpiringLiveCells = in.readInt();
 
-            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, SerializationHelper.Flag.LOCAL);
             assert !header.isReversed && header.rowEstimate >= 0;
 
-            MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, header.metadata.comparator, false);
+            MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false);
             List<Row> rows = new ArrayList<>(header.rowEstimate);
 
-            try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL, header))
+            try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, SerializationHelper.Flag.LOCAL, header))
             {
                 while (partition.hasNext())
                 {
@@ -250,7 +256,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
                 }
             }
 
-            return new ArrayBackedCachedPartition(header.metadata,
+            return new ArrayBackedCachedPartition(metadata,
                                                   header.key,
                                                   header.sHeader.columns(),
                                                   header.staticRow,
@@ -267,6 +273,8 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
 
         public long serializedSize(CachedPartition partition)
         {
+            int version = MessagingService.current_version;
+
             assert partition instanceof ArrayBackedCachedPartition;
             ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
 
@@ -277,6 +285,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
                      + TypeSizes.sizeof(p.rowsWithNonExpiringCells)
                      + TypeSizes.sizeof(p.nonTombstoneCellCount)
                      + TypeSizes.sizeof(p.nonExpiringLiveCells)
+                     + CFMetaData.serializer.serializedSize(partition.metadata(), version)
                      + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rowCount());
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 366828a..c9788e6 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -687,6 +687,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                 // assert count == written: "Table had " + count + " columns, but " + written + " written";
             }
 
+            CFMetaData.serializer.serialize(update.metadata(), out, version);
             try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
             {
                 assert !iter.isReverseOrder();
@@ -717,17 +718,18 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
 
             assert key == null; // key is only there for the old format
 
-            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag);
+            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
             if (header.isEmpty)
-                return emptyUpdate(header.metadata, header.key);
+                return emptyUpdate(metadata, header.key);
 
             assert !header.isReversed;
             assert header.rowEstimate >= 0;
 
-            MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, header.metadata.comparator, false);
+            MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false);
             List<Row> rows = new ArrayList<>(header.rowEstimate);
 
-            try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag, header))
+            try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag, header))
             {
                 while (partition.hasNext())
                 {
@@ -739,7 +741,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                 }
             }
 
-            return new PartitionUpdate(header.metadata,
+            return new PartitionUpdate(metadata,
                                        header.key,
                                        header.sHeader.columns(),
                                        header.staticRow,
@@ -770,7 +772,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
 
             try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
             {
-                return UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size());
+                return CFMetaData.serializer.serializedSize(update.metadata(), version)
+                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
index e2fec05..bfa6690 100644
--- a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions;
 
 import java.util.NoSuchElementException;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 public class SingletonUnfilteredPartitionIterator implements UnfilteredPartitionIterator
@@ -38,6 +39,11 @@ public class SingletonUnfilteredPartitionIterator implements UnfilteredPartition
         return isForThrift;
     }
 
+    public CFMetaData metadata()
+    {
+        return iter.metadata();
+    }
+
     public boolean hasNext()
     {
         return !returned;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
index 2447da8..10989df 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions;
 
 import java.util.Iterator;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 /**
@@ -42,5 +43,7 @@ public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowItera
      */
     public boolean isForThrift();
 
+    public CFMetaData metadata();
+
     public void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 4414f44..dd625c4 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -39,24 +39,6 @@ public abstract class UnfilteredPartitionIterators
 
     private static final Comparator<UnfilteredRowIterator> partitionComparator = (p1, p2) -> p1.partitionKey().compareTo(p2.partitionKey());
 
-    public static final UnfilteredPartitionIterator EMPTY = new AbstractUnfilteredPartitionIterator()
-    {
-        public boolean isForThrift()
-        {
-            return false;
-        }
-
-        public boolean hasNext()
-        {
-            return false;
-        }
-
-        public UnfilteredRowIterator next()
-        {
-            throw new NoSuchElementException();
-        }
-    };
-
     private UnfilteredPartitionIterators() {}
 
     public interface MergeListener
@@ -65,6 +47,33 @@ public abstract class UnfilteredPartitionIterators
         public void close();
     }
 
+
+    public static UnfilteredPartitionIterator empty(final CFMetaData metadata)
+    {
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            public boolean isForThrift()
+            {
+                return false;
+            }
+
+            public CFMetaData metadata()
+            {
+                return metadata;
+            }
+
+            public boolean hasNext()
+            {
+                return false;
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                throw new NoSuchElementException();
+            }
+        };
+    }
+
     @SuppressWarnings("resource") // The created resources are returned right away
     public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command)
     {
@@ -163,18 +172,17 @@ public abstract class UnfilteredPartitionIterators
         assert !iterators.isEmpty();
 
         final boolean isForThrift = iterators.get(0).isForThrift();
+        final CFMetaData metadata = iterators.get(0).metadata();
 
         final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
         {
             private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size());
 
-            private CFMetaData metadata;
             private DecoratedKey partitionKey;
             private boolean isReverseOrder;
 
             public void reduce(int idx, UnfilteredRowIterator current)
             {
-                metadata = current.metadata();
                 partitionKey = current.partitionKey();
                 isReverseOrder = current.isReverseOrder();
 
@@ -210,6 +218,11 @@ public abstract class UnfilteredPartitionIterators
                 return isForThrift;
             }
 
+            public CFMetaData metadata()
+            {
+                return metadata;
+            }
+
             public boolean hasNext()
             {
                 return merged.hasNext();
@@ -236,6 +249,7 @@ public abstract class UnfilteredPartitionIterators
             return iterators.get(0);
 
         final boolean isForThrift = iterators.get(0).isForThrift();
+        final CFMetaData metadata = iterators.get(0).metadata();
 
         final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
         {
@@ -276,6 +290,11 @@ public abstract class UnfilteredPartitionIterators
                 return isForThrift;
             }
 
+            public CFMetaData metadata()
+            {
+                return metadata;
+            }
+
             public boolean hasNext()
             {
                 return merged.hasNext();
@@ -353,7 +372,7 @@ public abstract class UnfilteredPartitionIterators
             out.writeBoolean(false);
         }
 
-        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final SerializationHelper.Flag flag) throws IOException
+        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException
         {
             if (version < MessagingService.VERSION_30)
                 throw new UnsupportedOperationException();
@@ -371,6 +390,11 @@ public abstract class UnfilteredPartitionIterators
                     return isForThrift;
                 }
 
+                public CFMetaData metadata()
+                {
+                    return metadata;
+                }
+
                 public boolean hasNext()
                 {
                     if (!nextReturned)
@@ -401,7 +425,7 @@ public abstract class UnfilteredPartitionIterators
                     try
                     {
                         nextReturned = true;
-                        next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag);
+                        next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag);
                         return next;
                     }
                     catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
index 4f35075..ebf3c28 100644
--- a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.partitions;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 
@@ -44,6 +45,11 @@ public abstract class WrappingUnfilteredPartitionIterator extends AbstractUnfilt
         return wrapped.isForThrift();
     }
 
+    public CFMetaData metadata()
+    {
+        return wrapped.metadata();
+    }
+
     public boolean hasNext()
     {
         prepareNext();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 129ed50..c998964 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -89,7 +89,6 @@ public class UnfilteredRowIteratorSerializer
     // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
     {
-        CFMetaData.serializer.serialize(iterator.metadata(), out, version);
         ByteBufferUtil.writeWithLength(iterator.partitionKey().getKey(), out);
 
         int flags = 0;
@@ -141,8 +140,7 @@ public class UnfilteredRowIteratorSerializer
 
         assert rowEstimate >= 0;
 
-        long size = CFMetaData.serializer.serializedSize(iterator.metadata(), version)
-                  + TypeSizes.sizeofWithLength(iterator.partitionKey().getKey())
+        long size = TypeSizes.sizeofWithLength(iterator.partitionKey().getKey())
                   + 1; // flags
 
         if (iterator.isEmpty())
@@ -170,16 +168,15 @@ public class UnfilteredRowIteratorSerializer
         return size;
     }
 
-    public Header deserializeHeader(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+    public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
     {
-        CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
         DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithLength(in));
         int flags = in.readUnsignedByte();
         boolean isReversed = (flags & IS_REVERSED) != 0;
         if ((flags & IS_EMPTY) != 0)
         {
             SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, RowStats.NO_STATS);
-            return new Header(sh, metadata, key, isReversed, true, null, null, 0);
+            return new Header(sh, key, isReversed, true, null, null, 0);
         }
 
         boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0;
@@ -195,17 +192,17 @@ public class UnfilteredRowIteratorSerializer
             staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag));
 
         int rowEstimate = hasRowEstimate ? (int)in.readVInt() : -1;
-        return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
+        return new Header(header, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, Header header) throws IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws IOException
     {
         if (header.isEmpty)
-            return UnfilteredRowIterators.emptyIterator(header.metadata, header.key, header.isReversed);
+            return UnfilteredRowIterators.emptyIterator(metadata, header.key, header.isReversed);
 
-        final SerializationHelper helper = new SerializationHelper(header.metadata, version, flag);
+        final SerializationHelper helper = new SerializationHelper(metadata, version, flag);
         final SerializationHeader sHeader = header.sHeader;
-        return new AbstractUnfilteredRowIterator(header.metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
+        return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
         {
             private final Row.Builder builder = ArrayBackedRow.sortedBuilder(sHeader.columns().regulars);
 
@@ -224,9 +221,9 @@ public class UnfilteredRowIteratorSerializer
         };
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
     {
-        return deserialize(in, version, flag,  deserializeHeader(in, version, flag));
+        return deserialize(in, version, metadata, flag, deserializeHeader(in, version, metadata, flag));
     }
 
     public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException
@@ -257,7 +254,6 @@ public class UnfilteredRowIteratorSerializer
     public static class Header
     {
         public final SerializationHeader sHeader;
-        public final CFMetaData metadata;
         public final DecoratedKey key;
         public final boolean isReversed;
         public final boolean isEmpty;
@@ -266,7 +262,6 @@ public class UnfilteredRowIteratorSerializer
         public final int rowEstimate; // -1 if no estimate
 
         private Header(SerializationHeader sHeader,
-                       CFMetaData metadata,
                        DecoratedKey key,
                        boolean isReversed,
                        boolean isEmpty,
@@ -275,7 +270,6 @@ public class UnfilteredRowIteratorSerializer
                        int rowEstimate)
         {
             this.sHeader = sHeader;
-            this.metadata = metadata;
             this.key = key;
             this.isReversed = isReversed;
             this.isEmpty = isEmpty;
@@ -287,8 +281,8 @@ public class UnfilteredRowIteratorSerializer
         @Override
         public String toString()
         {
-            return String.format("{header=%s, table=%s.%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
-                                 sHeader, metadata.ksName, metadata.cfName, key, isReversed, isEmpty, partitionDeletion, staticRow.toString(metadata), rowEstimate);
+            return String.format("{header=%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
+                                 sHeader, key, isReversed, isEmpty, partitionDeletion, staticRow, rowEstimate);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 0794e90..0451a98 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -25,6 +25,7 @@ import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
@@ -62,7 +63,7 @@ public class BigTableScanner implements ISSTableScanner
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
     private final boolean isForThrift;
 
-    protected UnfilteredPartitionIterator iterator;
+    protected Iterator<UnfilteredRowIterator> iterator;
 
     // Full scan of the sstables
     public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
@@ -80,7 +81,7 @@ public class BigTableScanner implements ISSTableScanner
         // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
         List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(tokenRanges);
         if (positions.isEmpty())
-            return new EmptySSTableScanner(sstable.getFilename());
+            return new EmptySSTableScanner(sstable);
 
         return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, makeBounds(sstable, tokenRanges).iterator());
     }
@@ -227,6 +228,11 @@ public class BigTableScanner implements ISSTableScanner
         return isForThrift;
     }
 
+    public CFMetaData metadata()
+    {
+        return sstable.metadata;
+    }
+
     public boolean hasNext()
     {
         if (iterator == null)
@@ -246,23 +252,18 @@ public class BigTableScanner implements ISSTableScanner
         throw new UnsupportedOperationException();
     }
 
-    private UnfilteredPartitionIterator createIterator()
+    private Iterator<UnfilteredRowIterator> createIterator()
     {
         return new KeyScanningIterator();
     }
 
-    protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+    protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator>
     {
         private DecoratedKey nextKey;
         private RowIndexEntry nextEntry;
         private DecoratedKey currentKey;
         private RowIndexEntry currentEntry;
 
-        public boolean isForThrift()
-        {
-            return isForThrift;
-        }
-
         protected UnfilteredRowIterator computeNext()
         {
             try
@@ -345,11 +346,6 @@ public class BigTableScanner implements ISSTableScanner
                 throw new CorruptSSTableException(e, sstable.getFilename());
             }
         }
-
-        public void close()
-        {
-            BigTableScanner.this.close();
-        }
     }
 
     @Override
@@ -364,11 +360,11 @@ public class BigTableScanner implements ISSTableScanner
 
     public static class EmptySSTableScanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
     {
-        private final String filename;
+        private final SSTableReader sstable;
 
-        public EmptySSTableScanner(String filename)
+        public EmptySSTableScanner(SSTableReader sstable)
         {
-            this.filename = filename;
+            this.sstable = sstable;
         }
 
         public long getLengthInBytes()
@@ -383,7 +379,7 @@ public class BigTableScanner implements ISSTableScanner
 
         public String getBackingFiles()
         {
-            return filename;
+            return sstable.getFilename();
         }
 
         public boolean isForThrift()
@@ -391,6 +387,11 @@ public class BigTableScanner implements ISSTableScanner
             return false;
         }
 
+        public CFMetaData metadata()
+        {
+            return sstable.metadata;
+        }
+
         public boolean hasNext()
         {
             return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index a9024e3..515a0d8 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         ReadResponse response = responses.iterator().next().payload;
-        return UnfilteredPartitionIterators.filter(response.makeIterator(), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata()), command.nowInSec());
     }
 
     public PartitionIterator resolve()
@@ -62,7 +62,7 @@ public class DataResolver extends ResponseResolver
         for (int i = 0; i < count; i++)
         {
             MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator());
+            iters.add(msg.payload.makeIterator(command.metadata()));
             sources[i] = msg.from;
         }
 
@@ -411,7 +411,7 @@ public class DataResolver extends ResponseResolver
                 // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
                 handler.awaitResults();
                 assert resolver.responses.size() == 1;
-                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(), retryCommand);
+                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata()), retryCommand);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 12b0626..42aee04 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         assert isDataPresent();
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec());
     }
 
     /*
@@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver
         {
             ReadResponse response = message.payload;
 
-            ByteBuffer newDigest = response.digest();
+            ByteBuffer newDigest = response.digest(command.metadata());
             if (digest == null)
                 digest = newDigest;
             else if (!digest.equals(newDigest))
@@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver
         if (logger.isDebugEnabled())
             logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a979690/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index dd57a96..efd3504 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -306,7 +306,7 @@ public class DataResolverTest
                                                                                                        .add("c2", "v2")
                                                                                                        .buildUpdate())));
         InetAddress peer2 = peer();
-        resolver.preprocess(readResponseMessage(peer2, UnfilteredPartitionIterators.EMPTY));
+        resolver.preprocess(readResponseMessage(peer2, UnfilteredPartitionIterators.empty(cfm)));
 
         try(PartitionIterator data = resolver.resolve();
             RowIterator rows = Iterators.getOnlyElement(data))
@@ -328,8 +328,8 @@ public class DataResolverTest
     public void testResolveWithBothEmpty()
     {
         DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
-        resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.EMPTY));
-        resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.EMPTY));
+        resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm)));
+        resolver.preprocess(readResponseMessage(peer(), UnfilteredPartitionIterators.empty(cfm)));
 
         try(PartitionIterator data = resolver.resolve())
         {