You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/12/13 09:27:32 UTC

[08/26] cassandra git commit: Thrift removal

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index e898c4f..dfcd772 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -63,12 +62,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public class CompositeType extends AbstractCompositeType
 {
-    public static final int STATIC_MARKER = 0xFFFF;
+    private static final int STATIC_MARKER = 0xFFFF;
 
     public final List<AbstractType<?>> types;
 
     // interning instances
-    private static final ConcurrentMap<List<AbstractType<?>>, CompositeType> instances = new ConcurrentHashMap<List<AbstractType<?>>, CompositeType>();
+    private static final ConcurrentMap<List<AbstractType<?>>, CompositeType> instances = new ConcurrentHashMap<>();
 
     public static CompositeType getInstance(TypeParser parser) throws ConfigurationException, SyntaxException
     {
@@ -77,7 +76,7 @@ public class CompositeType extends AbstractCompositeType
 
     public static CompositeType getInstance(AbstractType... types)
     {
-        return getInstance(Arrays.<AbstractType<?>>asList(types));
+        return getInstance(Arrays.asList(types));
     }
 
     protected boolean readIsStatic(ByteBuffer bb)
@@ -128,9 +127,8 @@ public class CompositeType extends AbstractCompositeType
         }
         catch (IndexOutOfBoundsException e)
         {
-            // We shouldn't get there in general because 1) we shouldn't construct broken composites
-            // from CQL and 2) broken composites coming from thrift should be rejected by validate.
-            // There is a few cases however where, if the schema has changed since we created/validated
+            // We shouldn't get there in general we shouldn't construct broken composites
+            // but there is a few cases where if the schema has changed since we created/validated
             // the composite, this will be thrown (see #6262). Those cases are a user error but
             // throwing a more meaningful error message to make understanding such error easier. .
             throw new RuntimeException("Cannot get comparator " + i + " in " + this + ". "
@@ -204,11 +202,6 @@ public class CompositeType extends AbstractCompositeType
         return l;
     }
 
-    public static byte lastEOC(ByteBuffer name)
-    {
-        return name.get(name.limit() - 1);
-    }
-
     // Extract component idx from bb. Return null if there is not enough component.
     public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
     {
@@ -227,13 +220,6 @@ public class CompositeType extends AbstractCompositeType
         return null;
     }
 
-    // Extract CQL3 column name from the full column name.
-    public ByteBuffer extractLastComponent(ByteBuffer bb)
-    {
-        int idx = types.get(types.size() - 1) instanceof ColumnToCollectionType ? types.size() - 2 : types.size() - 1;
-        return extractComponent(bb, idx);
-    }
-
     public static boolean isStaticName(ByteBuffer bb)
     {
         return bb.remaining() >= 2 && (ByteBufferUtil.getShortLength(bb, bb.position()) & 0xFFFF) == STATIC_MARKER;
@@ -334,11 +320,6 @@ public class CompositeType extends AbstractCompositeType
         return getClass().getName() + TypeParser.stringifyTypeParameters(types);
     }
 
-    public Builder builder()
-    {
-        return new Builder(this);
-    }
-
     public static ByteBuffer build(ByteBuffer... buffers)
     {
         return build(false, buffers);
@@ -366,143 +347,4 @@ public class CompositeType extends AbstractCompositeType
         out.flip();
         return out;
     }
-
-    public static class Builder
-    {
-        private final CompositeType composite;
-
-        private final List<ByteBuffer> components;
-        private final byte[] endOfComponents;
-        private int serializedSize;
-        private final boolean isStatic;
-
-        public Builder(CompositeType composite)
-        {
-            this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], false);
-        }
-
-        public static Builder staticBuilder(CompositeType composite)
-        {
-            return new Builder(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], true);
-        }
-
-        private Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents, boolean isStatic)
-        {
-            assert endOfComponents.length == composite.types.size();
-
-            this.composite = composite;
-            this.components = components;
-            this.endOfComponents = endOfComponents;
-            this.isStatic = isStatic;
-            if (isStatic)
-                serializedSize = 2;
-        }
-
-        private Builder(Builder b)
-        {
-            this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
-            this.serializedSize = b.serializedSize;
-        }
-
-        public Builder add(ByteBuffer bb)
-        {
-            if (components.size() >= composite.types.size())
-                throw new IllegalStateException("Composite column is already fully constructed");
-
-            components.add(bb);
-            serializedSize += 3 + bb.remaining(); // 2 bytes lenght + 1 byte eoc
-            return this;
-        }
-
-        public Builder add(ColumnIdentifier name)
-        {
-            return add(name.bytes);
-        }
-
-        public int componentCount()
-        {
-            return components.size();
-        }
-
-        public int remainingCount()
-        {
-            return composite.types.size() - components.size();
-        }
-
-        public ByteBuffer get(int i)
-        {
-            return components.get(i);
-        }
-
-        public ByteBuffer build()
-        {
-            try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize))
-            {
-                if (isStatic)
-                    out.writeShort(STATIC_MARKER);
-
-                for (int i = 0; i < components.size(); i++)
-                {
-                    ByteBufferUtil.writeWithShortLength(components.get(i), out);
-                    out.write(endOfComponents[i]);
-                }
-                return ByteBuffer.wrap(out.getData(), 0, out.getLength());
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        public ByteBuffer buildAsEndOfRange()
-        {
-            if (components.isEmpty())
-                return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-            ByteBuffer bb = build();
-            bb.put(bb.remaining() - 1, (byte)1);
-            return bb;
-        }
-
-        public ByteBuffer buildForRelation(Operator op)
-        {
-            /*
-             * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()),
-             * We can select:
-             *   - = 'a' by using <'a'><0>
-             *   - < 'a' by using <'a'><-1>
-             *   - <= 'a' by using <'a'><1>
-             *   - > 'a' by using <'a'><1>
-             *   - >= 'a' by using <'a'><0>
-             */
-            int current = components.size() - 1;
-            switch (op)
-            {
-                case LT:
-                    endOfComponents[current] = (byte) -1;
-                    break;
-                case GT:
-                case LTE:
-                    endOfComponents[current] = (byte) 1;
-                    break;
-                default:
-                    endOfComponents[current] = (byte) 0;
-                    break;
-            }
-            return build();
-        }
-
-        public Builder copy()
-        {
-            return new Builder(this);
-        }
-
-        public ByteBuffer getComponent(int i)
-        {
-            if (i >= components.size())
-                throw new IllegalArgumentException();
-
-            return components.get(i);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
index afe1cc3..90d64f4 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
@@ -36,24 +36,17 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
     private final int cachedLiveRows;
     private final int rowsWithNonExpiringCells;
 
-    private final int nonTombstoneCellCount;
-    private final int nonExpiringLiveCells;
-
     private CachedBTreePartition(CFMetaData metadata,
                                  DecoratedKey partitionKey,
                                  Holder holder,
                                  int createdAtInSec,
                                  int cachedLiveRows,
-                                 int rowsWithNonExpiringCells,
-                                 int nonTombstoneCellCount,
-                                 int nonExpiringLiveCells)
+                                 int rowsWithNonExpiringCells)
     {
         super(metadata, partitionKey, holder);
         this.createdAtInSec = createdAtInSec;
         this.cachedLiveRows = cachedLiveRows;
         this.rowsWithNonExpiringCells = rowsWithNonExpiringCells;
-        this.nonTombstoneCellCount = nonTombstoneCellCount;
-        this.nonExpiringLiveCells = nonExpiringLiveCells;
     }
 
     /**
@@ -89,30 +82,24 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
 
         int cachedLiveRows = 0;
         int rowsWithNonExpiringCells = 0;
-        int nonTombstoneCellCount = 0;
-        int nonExpiringLiveCells = 0;
 
         for (Row row : BTree.<Row>iterable(holder.tree))
         {
             if (row.hasLiveData(nowInSec))
                 ++cachedLiveRows;
 
-            int nonExpiringLiveCellsThisRow = 0;
+            boolean hasNonExpiringLiveCell = false;
             for (Cell cell : row.cells())
             {
-                if (!cell.isTombstone())
+                if (!cell.isTombstone() && !cell.isExpiring())
                 {
-                    ++nonTombstoneCellCount;
-                    if (!cell.isExpiring())
-                        ++nonExpiringLiveCellsThisRow;
+                    hasNonExpiringLiveCell = true;
+                    break;
                 }
             }
 
-            if (nonExpiringLiveCellsThisRow > 0)
-            {
+            if (hasNonExpiringLiveCell)
                 ++rowsWithNonExpiringCells;
-                nonExpiringLiveCells += nonExpiringLiveCellsThisRow;
-            }
         }
 
         return new CachedBTreePartition(iterator.metadata(),
@@ -120,9 +107,7 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
                                         holder,
                                         nowInSec,
                                         cachedLiveRows,
-                                        rowsWithNonExpiringCells,
-                                        nonTombstoneCellCount,
-                                        nonExpiringLiveCells);
+                                        rowsWithNonExpiringCells);
     }
 
     /**
@@ -153,16 +138,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
         return rowsWithNonExpiringCells;
     }
 
-    public int nonTombstoneCellCount()
-    {
-        return nonTombstoneCellCount;
-    }
-
-    public int nonExpiringLiveCells()
-    {
-        return nonExpiringLiveCells;
-    }
-
     static class Serializer implements ISerializer<CachedPartition>
     {
         public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException
@@ -175,8 +150,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
             out.writeInt(p.createdAtInSec);
             out.writeInt(p.cachedLiveRows);
             out.writeInt(p.rowsWithNonExpiringCells);
-            out.writeInt(p.nonTombstoneCellCount);
-            out.writeInt(p.nonExpiringLiveCells);
             CFMetaData.serializer.serialize(partition.metadata(), out, version);
             try (UnfilteredRowIterator iter = p.unfilteredIterator())
             {
@@ -198,8 +171,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
             int createdAtInSec = in.readInt();
             int cachedLiveRows = in.readInt();
             int rowsWithNonExpiringCells = in.readInt();
-            int nonTombstoneCellCount = in.readInt();
-            int nonExpiringLiveCells = in.readInt();
 
 
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
@@ -217,9 +188,7 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
                                                   holder,
                                                   createdAtInSec,
                                                   cachedLiveRows,
-                                                  rowsWithNonExpiringCells,
-                                                  nonTombstoneCellCount,
-                                                  nonExpiringLiveCells);
+                                                  rowsWithNonExpiringCells);
 
         }
 
@@ -235,8 +204,6 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac
                 return TypeSizes.sizeof(p.createdAtInSec)
                      + TypeSizes.sizeof(p.cachedLiveRows)
                      + TypeSizes.sizeof(p.rowsWithNonExpiringCells)
-                     + TypeSizes.sizeof(p.nonTombstoneCellCount)
-                     + TypeSizes.sizeof(p.nonExpiringLiveCells)
                      + CFMetaData.serializer.serializedSize(partition.metadata(), version)
                      + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
index 0cbaba0..6c781f5 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
@@ -71,24 +71,4 @@ public interface CachedPartition extends Partition, IRowCacheEntry
      * @return the last row of the partition, or {@code null} if the partition is empty.
      */
     public Row lastRow();
-
-    /**
-     * The number of {@code cell} objects that are not tombstone in this cached partition.
-     *
-     * Please note that this is <b>not</b> the number of <em>live</em> cells since
-     * some of the cells might be expired.
-     *
-     * @return the number of non tombstone cells in the partition.
-     */
-    public int nonTombstoneCellCount();
-
-    /**
-     * The number of cells in this cached partition that are neither tombstone nor expiring.
-     *
-     * Note that this is generally not a very meaningful number, but this is used by
-     * {@link org.apache.cassandra.db.filter.DataLimits#hasEnoughLiveData} as an optimization.
-     *
-     * @return the number of cells that are neither tombstones nor expiring.
-     */
-    public int nonExpiringLiveCells();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index 6679bdf..b5580c6 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -25,14 +25,12 @@ import org.apache.cassandra.db.transform.Transformation;
 
 public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator>
 {
-    private final boolean isForThrift;
     private final DeletionPurger purger;
     private final int nowInSec;
     private boolean isReverseOrder;
 
-    public PurgeFunction(boolean isForThrift, int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
+    public PurgeFunction(int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
     {
-        this.isForThrift = isForThrift;
         this.nowInSec = nowInSec;
         this.purger = (timestamp, localDeletionTime) ->
                       !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
@@ -64,7 +62,7 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
 
         isReverseOrder = partition.isReverseOrder();
         UnfilteredRowIterator purged = Transformation.apply(partition, this);
-        if (!isForThrift && purged.isEmpty())
+        if (purged.isEmpty())
         {
             onEmptyPartitionPostPurge(purged.partitionKey());
             purged.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
index 1f966db..3b968e6 100644
--- a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -25,18 +25,11 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 public class SingletonUnfilteredPartitionIterator implements UnfilteredPartitionIterator
 {
     private final UnfilteredRowIterator iter;
-    private final boolean isForThrift;
     private boolean returned;
 
-    public SingletonUnfilteredPartitionIterator(UnfilteredRowIterator iter, boolean isForThrift)
+    public SingletonUnfilteredPartitionIterator(UnfilteredRowIterator iter)
     {
         this.iter = iter;
-        this.isForThrift = isForThrift;
-    }
-
-    public boolean isForThrift()
-    {
-        return isForThrift;
     }
 
     public CFMetaData metadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
index 201c934..872225f 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -30,16 +30,5 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  */
 public interface UnfilteredPartitionIterator extends BasePartitionIterator<UnfilteredRowIterator>
 {
-    /**
-     * Whether that partition iterator is for a thrift queries.
-     * <p>
-     * If this is true, the partition iterator may return some empty UnfilteredRowIterator and those
-     * should be preserved as thrift include partitions that "exists" (have some cells even
-     * if this are actually deleted) but have nothing matching the query.
-     *
-     * @return whether the iterator is for a thrift query.
-     */
-    public boolean isForThrift();
-
     public CFMetaData metadata();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index bcc8d4d..d86e8b1 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -113,7 +113,6 @@ public abstract class UnfilteredPartitionIterators
         assert listener != null;
         assert !iterators.isEmpty();
 
-        final boolean isForThrift = iterators.get(0).isForThrift();
         final CFMetaData metadata = iterators.get(0).metadata();
 
         final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
@@ -155,11 +154,6 @@ public abstract class UnfilteredPartitionIterators
 
         return new AbstractUnfilteredPartitionIterator()
         {
-            public boolean isForThrift()
-            {
-                return isForThrift;
-            }
-
             public CFMetaData metadata()
             {
                 return metadata;
@@ -190,7 +184,6 @@ public abstract class UnfilteredPartitionIterators
         if (iterators.size() == 1)
             return iterators.get(0);
 
-        final boolean isForThrift = iterators.get(0).isForThrift();
         final CFMetaData metadata = iterators.get(0).metadata();
 
         final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
@@ -221,11 +214,6 @@ public abstract class UnfilteredPartitionIterators
 
         return new AbstractUnfilteredPartitionIterator()
         {
-            public boolean isForThrift()
-            {
-                return isForThrift;
-            }
-
             public CFMetaData metadata()
             {
                 return metadata;
@@ -301,7 +289,9 @@ public abstract class UnfilteredPartitionIterators
     {
         public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
         {
-            out.writeBoolean(iter.isForThrift());
+            // Previously, a boolean indicating if this was for a thrift query.
+            // Unused since 4.0 but kept on wire for compatibility.
+            out.writeBoolean(false);
             while (iter.hasNext())
             {
                 out.writeBoolean(true);
@@ -315,7 +305,8 @@ public abstract class UnfilteredPartitionIterators
 
         public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException
         {
-            final boolean isForThrift = in.readBoolean();
+            // Skip now unused isForThrift boolean
+            in.readBoolean();
 
             return new AbstractUnfilteredPartitionIterator()
             {
@@ -323,11 +314,6 @@ public abstract class UnfilteredPartitionIterators
                 private boolean hasNext;
                 private boolean nextReturned = true;
 
-                public boolean isForThrift()
-                {
-                    return isForThrift;
-                }
-
                 public CFMetaData metadata()
                 {
                     return metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index 30a0a37..8dbc606 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.thrift.ThriftResultsMerger;
 import org.apache.cassandra.utils.IteratorWithLowerBound;
 
 /**
@@ -48,27 +47,18 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt
     private final SSTableReader sstable;
     private final ClusteringIndexFilter filter;
     private final ColumnFilter selectedColumns;
-    private final boolean isForThrift;
-    private final int nowInSec;
-    private final boolean applyThriftTransformation;
     private ClusteringBound lowerBound;
     private boolean firstItemRetrieved;
 
     public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey,
                                                SSTableReader sstable,
                                                ClusteringIndexFilter filter,
-                                               ColumnFilter selectedColumns,
-                                               boolean isForThrift,
-                                               int nowInSec,
-                                               boolean applyThriftTransformation)
+                                               ColumnFilter selectedColumns)
     {
         super(partitionKey);
         this.sstable = sstable;
         this.filter = filter;
         this.selectedColumns = selectedColumns;
-        this.isForThrift = isForThrift;
-        this.nowInSec = nowInSec;
-        this.applyThriftTransformation = applyThriftTransformation;
         this.lowerBound = null;
         this.firstItemRetrieved = false;
     }
@@ -102,10 +92,8 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt
         sstable.incrementReadCount();
 
         @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
-        UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift);
-        return isForThrift && applyThriftTransformation
-               ? ThriftResultsMerger.maybeWrap(iter, nowInSec)
-               : iter;
+        UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed());
+        return iter;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..729f06b 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.db.rows.*;
 
 final class Filter extends Transformation
 {
-    private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
+    private final boolean filterEmpty; // generally true except for direct row filtration
     private final int nowInSec;
     public Filter(boolean filterEmpty, int nowInSec)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ed643bb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -52,7 +52,7 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
      */
     public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
     {
-        Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
+        Filter filter = new Filter(true, nowInSecs);
         if (iterator instanceof UnfilteredPartitions)
             return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
         return new FilteredPartitions(iterator, filter);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
index bad14ad..de0a51b 100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
@@ -26,18 +26,10 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 final class UnfilteredPartitions extends BasePartitions<UnfilteredRowIterator, UnfilteredPartitionIterator> implements UnfilteredPartitionIterator
 {
-    final boolean isForThrift;
-
     // wrap an iterator for transformation
     public UnfilteredPartitions(UnfilteredPartitionIterator input)
     {
         super(input);
-        this.isForThrift = input.isForThrift();
-    }
-
-    public boolean isForThrift()
-    {
-        return isForThrift;
     }
 
     public CFMetaData metadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index a4deb4a..078d712 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -28,16 +28,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TTransport;
 
 public class ConfigHelper
 {
@@ -59,16 +53,13 @@ public class ConfigHelper
     private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
     private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
     private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
-    private static final String INPUT_THRIFT_PORT = "cassandra.input.thrift.port";
-    private static final String OUTPUT_THRIFT_PORT = "cassandra.output.thrift.port";
-    private static final String INPUT_INITIAL_THRIFT_ADDRESS = "cassandra.input.thrift.address";
-    private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address";
+    private static final String INPUT_INITIAL_ADDRESS = "cassandra.input.address";
+    private static final String OUTPUT_INITIAL_ADDRESS = "cassandra.output.address";
     private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
     private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
     private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
     private static final String OUTPUT_LOCAL_DC_ONLY = "cassandra.output.local.dc.only";
-    private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
 
     private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
 
@@ -213,104 +204,28 @@ public class ConfigHelper
     }
 
     /**
-     * Set the predicate that determines what columns will be selected from each row.
-     *
-     * @param conf      Job configuration you are about to run
-     * @param predicate
-     */
-    public static void setInputSlicePredicate(Configuration conf, SlicePredicate predicate)
-    {
-        conf.set(INPUT_PREDICATE_CONFIG, thriftToString(predicate));
-    }
-
-    public static SlicePredicate getInputSlicePredicate(Configuration conf)
-    {
-        String s = conf.get(INPUT_PREDICATE_CONFIG);
-        return s == null ? null : predicateFromString(s);
-    }
-
-    private static String thriftToString(TBase object)
-    {
-        assert object != null;
-        // this is so awful it's kind of cool!
-        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
-        try
-        {
-            return Hex.bytesToHex(serializer.serialize(object));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static SlicePredicate predicateFromString(String st)
-    {
-        assert st != null;
-        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        SlicePredicate predicate = new SlicePredicate();
-        try
-        {
-            deserializer.deserialize(predicate, Hex.hexToBytes(st));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return predicate;
-    }
-
-    /**
      * Set the KeyRange to limit the rows.
      * @param conf Job configuration you are about to run
      */
     public static void setInputRange(Configuration conf, String startToken, String endToken)
     {
-        KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken);
-        conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
+        conf.set(INPUT_KEYRANGE_CONFIG, startToken + "," + endToken);
     }
 
     /**
-     * Set the KeyRange to limit the rows.
-     * @param conf Job configuration you are about to run
-     */
-    public static void setInputRange(Configuration conf, String startToken, String endToken, List<IndexExpression> filter)
-    {
-        KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken).setRow_filter(filter);
-        conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
-    }
-
-    /**
-     * Set the KeyRange to limit the rows.
-     * @param conf Job configuration you are about to run
+     * The start and end token of the input key range as a pair.
+     *
+     * may be null if unset.
      */
-    public static void setInputRange(Configuration conf, List<IndexExpression> filter)
-    {
-        KeyRange range = new KeyRange().setRow_filter(filter);
-        conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
-    }
-
-    /** may be null if unset */
-    public static KeyRange getInputKeyRange(Configuration conf)
+    public static Pair<String, String> getInputKeyRange(Configuration conf)
     {
         String str = conf.get(INPUT_KEYRANGE_CONFIG);
-        return str == null ? null : keyRangeFromString(str);
-    }
+        if (str == null)
+            return null;
 
-    private static KeyRange keyRangeFromString(String st)
-    {
-        assert st != null;
-        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        KeyRange keyRange = new KeyRange();
-        try
-        {
-            deserializer.deserialize(keyRange, Hex.hexToBytes(st));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return keyRange;
+        String[] parts = str.split(",");
+        assert parts.length == 2;
+        return Pair.create(parts[0], parts[1]);
     }
 
     public static String getInputKeyspace(Configuration conf)
@@ -413,26 +328,15 @@ public class ConfigHelper
         conf.set(WRITE_CONSISTENCY_LEVEL, consistencyLevel);
     }
 
-    public static int getInputRpcPort(Configuration conf)
-    {
-        return Integer.parseInt(conf.get(INPUT_THRIFT_PORT, "9160"));
-    }
-
-    public static void setInputRpcPort(Configuration conf, String port)
-    {
-        conf.set(INPUT_THRIFT_PORT, port);
-    }
-
     public static String getInputInitialAddress(Configuration conf)
     {
-        return conf.get(INPUT_INITIAL_THRIFT_ADDRESS);
+        return conf.get(INPUT_INITIAL_ADDRESS);
     }
 
     public static void setInputInitialAddress(Configuration conf, String address)
     {
-        conf.set(INPUT_INITIAL_THRIFT_ADDRESS, address);
+        conf.set(INPUT_INITIAL_ADDRESS, address);
     }
-
     public static void setInputPartitioner(Configuration conf, String classname)
     {
         conf.set(INPUT_PARTITIONER_CONFIG, classname);
@@ -443,24 +347,14 @@ public class ConfigHelper
         return FBUtilities.newPartitioner(conf.get(INPUT_PARTITIONER_CONFIG));
     }
 
-    public static int getOutputRpcPort(Configuration conf)
-    {
-        return Integer.parseInt(conf.get(OUTPUT_THRIFT_PORT, "9160"));
-    }
-
-    public static void setOutputRpcPort(Configuration conf, String port)
-    {
-        conf.set(OUTPUT_THRIFT_PORT, port);
-    }
-
     public static String getOutputInitialAddress(Configuration conf)
     {
-        return conf.get(OUTPUT_INITIAL_THRIFT_ADDRESS);
+        return conf.get(OUTPUT_INITIAL_ADDRESS);
     }
 
     public static void setOutputInitialAddress(Configuration conf, String address)
     {
-        conf.set(OUTPUT_INITIAL_THRIFT_ADDRESS, address);
+        conf.set(OUTPUT_INITIAL_ADDRESS, address);
     }
 
     public static void setOutputPartitioner(Configuration conf, String classname)
@@ -493,20 +387,6 @@ public class ConfigHelper
         conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length);
     }
 
-    public static void setThriftFramedTransportSizeInMb(Configuration conf, int frameSizeInMB)
-    {
-        conf.setInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, frameSizeInMB);
-    }
-
-    /**
-     * @param conf The configuration to use.
-     * @return Value (converts MBs to Bytes) set by {@link #setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB
-     */
-    public static int getThriftFramedTransportSize(Configuration conf)
-    {
-        return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra
-    }
-
     public static boolean getOutputLocalDCOnly(Configuration conf)
     {
         return Boolean.parseBoolean(conf.get(OUTPUT_LOCAL_DC_ONLY, "false"));
@@ -517,79 +397,6 @@ public class ConfigHelper
         conf.set(OUTPUT_LOCAL_DC_ONLY, Boolean.toString(localDCOnly));
     }
 
-    public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException
-    {
-        return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","), ConfigHelper.getInputRpcPort(conf));
-    }
-
-    public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws IOException
-    {
-        return getClientFromAddressList(conf, ConfigHelper.getOutputInitialAddress(conf).split(","), ConfigHelper.getOutputRpcPort(conf));
-    }
-
-    private static Cassandra.Client getClientFromAddressList(Configuration conf, String[] addresses, int port) throws IOException
-    {
-        Cassandra.Client client = null;
-        List<IOException> exceptions = new ArrayList<IOException>();
-        for (String address : addresses)
-        {
-            try
-            {
-                client = createConnection(conf, address, port);
-                break;
-            }
-            catch (IOException ioe)
-            {
-                exceptions.add(ioe);
-            }
-        }
-        if (client == null)
-        {
-            logger.error("failed to connect to any initial addresses");
-            for (IOException ioe : exceptions)
-            {
-                logger.error("", ioe);
-            }
-            throw exceptions.get(exceptions.size() - 1);
-        }
-        return client;
-    }
-
-    @SuppressWarnings("resource")
-    public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException
-    {
-        try
-        {
-            TTransport transport = getClientTransportFactory(conf).openTransport(host, port);
-            return new Cassandra.Client(new TBinaryProtocol(transport, true, true));
-        }
-        catch (Exception e)
-        {
-            throw new IOException("Unable to connect to server " + host + ":" + port, e);
-        }
-    }
-
-    public static ITransportFactory getClientTransportFactory(Configuration conf)
-    {
-        String factoryClassName = conf.get(ITransportFactory.PROPERTY_KEY, TFramedTransportFactory.class.getName());
-        ITransportFactory factory = getClientTransportFactory(factoryClassName);
-        Map<String, String> options = getOptions(conf, factory.supportedOptions());
-        factory.setOptions(options);
-        return factory;
-    }
-
-    private static ITransportFactory getClientTransportFactory(String factoryClassName)
-    {
-        try
-        {
-            return (ITransportFactory) Class.forName(factoryClassName).newInstance();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName, e);
-        }
-    }
-
     private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions)
     {
         Map<String, String> options = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 5e47ed5..45a227b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -42,8 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.utils.*;
 
 import static java.util.stream.Collectors.toMap;
 
@@ -133,30 +133,12 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
              Session session = cluster.connect())
         {
             List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
-            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+            Pair<String, String> jobKeyRange = ConfigHelper.getInputKeyRange(conf);
             Range<Token> jobRange = null;
             if (jobKeyRange != null)
             {
-                if (jobKeyRange.start_key != null)
-                {
-                    if (!partitioner.preservesOrder())
-                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
-                    if (jobKeyRange.start_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    if (jobKeyRange.end_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
-                                           partitioner.getToken(jobKeyRange.end_key));
-                }
-                else if (jobKeyRange.start_token != null)
-                {
-                    jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
-                                           partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
-                }
-                else
-                {
-                    logger.warn("ignoring jobKeyRange specified without start_key or start_token");
-                }
+                jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.left),
+                                       partitioner.getTokenFactory().fromString(jobKeyRange.right));
             }
 
             Metadata metadata = cluster.getMetadata();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
index 89eebdf..8047e1d 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
@@ -59,7 +59,7 @@ public interface CassandraIndexFunctions
      *   *other* clustering values in the index - the indexed value being the index table's partition key
      * * When the indexed value is a collection value, in which case we also need to capture the cell path from the base
      *   table
-     * * In a KEYS index (for thrift/compact storage/static column indexes), where only the base partition key is
+     * * In a KEYS index (for compact storage/static column indexes), where only the base partition key is
      *   held in the index table.
      *
      * Called from indexCfsMetadata

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/IndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/IndexEntry.java b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
index 97525d6..3e4e41b 100644
--- a/src/java/org/apache/cassandra/index/internal/IndexEntry.java
+++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.DecoratedKey;
 /**
  * Entries in indexes on non-compact tables (tables with composite comparators)
  * can be encapsulated as IndexedEntry instances. These are not used when dealing
- * with indexes on static/compact/thrift tables (i.e. KEYS indexes).
+ * with indexes on static/compact tables (i.e. KEYS indexes).
  */
 public final class IndexEntry
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index 324d939..b5e4a78 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -69,11 +69,6 @@ public class CompositesSearcher extends CassandraIndexSearcher
 
             private UnfilteredRowIterator next;
 
-            public boolean isForThrift()
-            {
-                return command.isForThrift();
-            }
-
             public CFMetaData metadata()
             {
                 return command.metadata();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index 0169d3f..febb09f 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -23,12 +23,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.index.internal.CassandraIndex;
@@ -57,11 +55,6 @@ public class KeysSearcher extends CassandraIndexSearcher
         {
             private UnfilteredRowIterator next;
 
-            public boolean isForThrift()
-            {
-                return command.isForThrift();
-            }
-
             public CFMetaData metadata()
             {
                 return command.metadata();
@@ -92,8 +85,7 @@ public class KeysSearcher extends CassandraIndexSearcher
                         continue;
 
                     ColumnFilter extendedFilter = getExtendedFilter(command.columnFilter());
-                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
-                                                                                           index.baseCfs.metadata,
+                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
                                                                                            command.nowInSec(),
                                                                                            extendedFilter,
                                                                                            command.rowFilter(),
@@ -108,7 +100,6 @@ public class KeysSearcher extends CassandraIndexSearcher
                                                                    hit,
                                                                    indexKey.getKey(),
                                                                    executionController.writeOpOrderGroup(),
-                                                                   isForThrift(),
                                                                    command.nowInSec());
 
                     if (dataIter != null)
@@ -151,66 +142,23 @@ public class KeysSearcher extends CassandraIndexSearcher
                                                 Row indexHit,
                                                 ByteBuffer indexedValue,
                                                 OpOrder.Group writeOp,
-                                                boolean isForThrift,
                                                 int nowInSec)
     {
-        if (isForThrift)
+        assert iterator.metadata().isCompactTable();
+        Row data = iterator.staticRow();
+        if (index.isStale(data, indexedValue, nowInSec))
         {
-            // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
-            // is the indexed name and so we need to materialize the partition.
-            ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
+            // Index is stale, remove the index entry and ignore
+            index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
+                    makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
+                    new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+                    writeOp);
             iterator.close();
-            Row data = result.getRow(Clustering.make(index.getIndexedColumn().name.bytes));
-            if (data == null)
-                return null;
-
-            // for thrift tables, we need to compare the index entry against the compact value column,
-            // not the column actually designated as the indexed column so we don't use the index function
-            // lib for the staleness check like we do in every other case
-            Cell baseData = data.getCell(index.baseCfs.metadata.compactValueColumn());
-            if (baseData == null || !baseData.isLive(nowInSec) || index.getIndexedColumn().type.compare(indexedValue, baseData.value()) != 0)
-            {
-                // Index is stale, remove the index entry and ignore
-                index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
-                                         Clustering.make(index.getIndexedColumn().name.bytes),
-                                         new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
-                                         writeOp);
-                return null;
-            }
-            else
-            {
-                if (command.columnFilter().fetches(index.getIndexedColumn()))
-                    return result.unfilteredIterator();
-
-                // The query on the base table used an extended column filter to ensure that the
-                // indexed column was actually read for use in the staleness check, before
-                // returning the results we must filter the base table partition so that it
-                // contains only the originally requested columns. See CASSANDRA-11523
-                ClusteringComparator comparator = result.metadata().comparator;
-                Slices.Builder slices = new Slices.Builder(comparator);
-                for (ColumnDefinition selected : command.columnFilter().fetchedColumns())
-                    slices.add(Slice.make(comparator, selected.name.bytes));
-                return result.unfilteredIterator(ColumnFilter.all(command.metadata()), slices.build(), false);
-            }
+            return null;
         }
         else
         {
-            assert iterator.metadata().isCompactTable();
-            Row data = iterator.staticRow();
-            if (index.isStale(data, indexedValue, nowInSec))
-            {
-                // Index is stale, remove the index entry and ignore
-                index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
-                                         makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
-                                         new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
-                                         writeOp);
-                iterator.close();
-                return null;
-            }
-            else
-            {
-                return iterator;
-            }
+            return iterator;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index 155cd4f..336a740 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -63,11 +63,6 @@ public class QueryController
         this.executionStart = System.nanoTime();
     }
 
-    public boolean isForThrift()
-    {
-        return command.isForThrift();
-    }
-
     public CFMetaData metadata()
     {
         return command.metadata();
@@ -101,8 +96,7 @@ public class QueryController
             throw new NullPointerException();
         try
         {
-            SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(),
-                                                                                     cfs.metadata,
+            SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(cfs.metadata,
                                                                                      command.nowInSec(),
                                                                                      command.columnFilter(),
                                                                                      command.rowFilter().withoutExpressions(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
index 4410756..8a25f79 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -153,11 +153,6 @@ public class QueryPlan
             }
         }
 
-        public boolean isForThrift()
-        {
-            return controller.isForThrift();
-        }
-
         public CFMetaData metadata()
         {
             return controller.metadata();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index add8ddc..da37069 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1532,8 +1532,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      */
     protected abstract RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
 
-    public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
-    public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
+    public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed);
+    public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed);
 
     public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly);
 
@@ -1693,7 +1693,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * @param dataRange filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, boolean isForThrift);
+    public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange);
 
     public FileDataInput getFileDataInput(long position)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index ac7801c..02b685b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ChecksumType;
 
 /**
  * Legacy bigtable format
@@ -74,7 +73,7 @@ public class BigFormat implements SSTableFormat
     @Override
     public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData metadata, Version version, SerializationHeader header)
     {
-        return new RowIndexEntry.Serializer(metadata, version, header);
+        return new RowIndexEntry.Serializer(version, header);
     }
 
     static class WriterFactory extends SSTableWriter.Factory

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 8deb685..613961a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -55,19 +55,19 @@ public class BigTableReader extends SSTableReader
         super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
     }
 
-    public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
+    public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed)
     {
         RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ);
-        return iterator(null, key, rie, slices, selectedColumns, reversed, isForThrift);
+        return iterator(null, key, rie, slices, selectedColumns, reversed);
     }
 
-    public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
+    public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed)
     {
         if (indexEntry == null)
             return UnfilteredRowIterators.noRowsIterator(metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
         return reversed
-             ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile)
-             : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile);
+             ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, ifile)
+             : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, ifile);
     }
 
     /**
@@ -75,9 +75,9 @@ public class BigTableReader extends SSTableReader
      * @param dataRange filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, boolean isForThrift)
+    public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange)
     {
-        return BigTableScanner.getScanner(this, columns, dataRange, isForThrift);
+        return BigTableScanner.getScanner(this, columns, dataRange);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 5260b3a..1b33f5b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -60,7 +60,6 @@ public class BigTableScanner implements ISSTableScanner
     private final ColumnFilter columns;
     private final DataRange dataRange;
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
-    private final boolean isForThrift;
     private long startScan = -1;
     private long bytesScanned = 0;
 
@@ -69,12 +68,12 @@ public class BigTableScanner implements ISSTableScanner
     // Full scan of the sstables
     public static ISSTableScanner getScanner(SSTableReader sstable)
     {
-        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, false, Iterators.singletonIterator(fullRange(sstable)));
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, Iterators.singletonIterator(fullRange(sstable)));
     }
 
-    public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, boolean isForThrift)
+    public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange)
     {
-        return new BigTableScanner(sstable, columns, dataRange, isForThrift, makeBounds(sstable, dataRange).iterator());
+        return new BigTableScanner(sstable, columns, dataRange, makeBounds(sstable, dataRange).iterator());
     }
 
     public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
@@ -84,15 +83,15 @@ public class BigTableScanner implements ISSTableScanner
         if (positions.isEmpty())
             return new EmptySSTableScanner(sstable);
 
-        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, false, makeBounds(sstable, tokenRanges).iterator());
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, makeBounds(sstable, tokenRanges).iterator());
     }
 
     public static ISSTableScanner getScanner(SSTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
     {
-        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, false, rangeIterator);
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, rangeIterator);
     }
 
-    private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, boolean isForThrift, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
+    private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
     {
         assert sstable != null;
 
@@ -104,7 +103,6 @@ public class BigTableScanner implements ISSTableScanner
         this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
                                                                                                         sstable.descriptor.version,
                                                                                                         sstable.header);
-        this.isForThrift = isForThrift;
         this.rangeIterator = rangeIterator;
     }
 
@@ -182,7 +180,7 @@ public class BigTableScanner implements ISSTableScanner
                 if (indexDecoratedKey.compareTo(currentRange.left) > 0 || currentRange.contains(indexDecoratedKey))
                 {
                     // Found, just read the dataPosition and seek into index and data files
-                    long dataPosition = RowIndexEntry.Serializer.readPosition(ifile, sstable.descriptor.version);
+                    long dataPosition = RowIndexEntry.Serializer.readPosition(ifile);
                     ifile.seek(indexPosition);
                     dfile.seek(dataPosition);
                     break;
@@ -239,11 +237,6 @@ public class BigTableScanner implements ISSTableScanner
         return sstable.toString();
     }
 
-    public boolean isForThrift()
-    {
-        return isForThrift;
-    }
-
     public CFMetaData metadata()
     {
         return sstable.metadata;
@@ -359,7 +352,7 @@ public class BigTableScanner implements ISSTableScanner
                             }
 
                             ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey());
-                            return sstable.iterator(dfile, partitionKey(), currentEntry, filter.getSlices(BigTableScanner.this.metadata()), columns, filter.isReversed(), isForThrift);
+                            return sstable.iterator(dfile, partitionKey(), currentEntry, filter.getSlices(BigTableScanner.this.metadata()), columns, filter.isReversed());
                         }
                         catch (CorruptSSTableException | IOException e)
                         {
@@ -421,11 +414,6 @@ public class BigTableScanner implements ISSTableScanner
             return sstable.getFilename();
         }
 
-        public boolean isForThrift()
-        {
-            return false;
-        }
-
         public CFMetaData metadata()
         {
             return sstable.metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java b/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
deleted file mode 100644
index 798f09e..0000000
--- a/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * Implementors of IRequestScheduler must provide a constructor taking a RequestSchedulerOptions object.
- */
-public interface IRequestScheduler
-{
-    /**
-     * Queue incoming request threads
-     *
-     * @param t Thread handing the request
-     * @param id    Scheduling parameter, an id to distinguish profiles (users/keyspace)
-     * @param timeoutMS   The max time in milliseconds to spend blocking for a slot
-     */
-    public void queue(Thread t, String id, long timeoutMS) throws TimeoutException;
-
-    /**
-     * A convenience method for indicating when a particular request has completed
-     * processing, and before a return to the client
-     */
-    public void release();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/NoScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/NoScheduler.java b/src/java/org/apache/cassandra/scheduler/NoScheduler.java
deleted file mode 100644
index d3f4ce8..0000000
--- a/src/java/org/apache/cassandra/scheduler/NoScheduler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-import org.apache.cassandra.config.RequestSchedulerOptions;
-
-
-/**
- * This is basically not having a scheduler, the requests are
- * processed as normally would be handled by the JVM.
- */
-public class NoScheduler implements IRequestScheduler
-{
-
-    public NoScheduler(RequestSchedulerOptions options) {}
-
-    public NoScheduler() {}
-
-    public void queue(Thread t, String id, long timeoutMS) {}
-
-    public void release() {}
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
deleted file mode 100644
index 904deb3..0000000
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.RequestSchedulerOptions;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-/**
- * A very basic Round Robin implementation of the RequestScheduler. It handles
- * request groups identified on user/keyspace by placing them in separate
- * queues and servicing a request from each queue in a RoundRobin fashion.
- * It optionally adds weights for each round.
- */
-public class RoundRobinScheduler implements IRequestScheduler
-{
-    private static final Logger logger = LoggerFactory.getLogger(RoundRobinScheduler.class);
-
-    //Map of queue id to weighted queue
-    private final NonBlockingHashMap<String, WeightedQueue> queues;
-
-    private final Semaphore taskCount;
-
-    // Tracks the count of threads available in all queues
-    // Used by the the scheduler thread so we don't need to busy-wait until there is a request to process
-    private final Semaphore queueSize = new Semaphore(0, false);
-
-    private final int defaultWeight;
-    private final Map<String, Integer> weights;
-
-    public RoundRobinScheduler(RequestSchedulerOptions options)
-    {
-        defaultWeight = options.default_weight;
-        weights = options.weights;
-
-        // the task count is acquired for the first time _after_ releasing a thread, so we pre-decrement
-        taskCount = new Semaphore(options.throttle_limit - 1);
-
-        queues = new NonBlockingHashMap<String, WeightedQueue>();
-        Runnable runnable = () ->
-        {
-            while (true)
-            {
-                schedule();
-            }
-        };
-        Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
-        scheduler.start();
-        logger.info("Started the RoundRobin Request Scheduler");
-    }
-
-    public void queue(Thread t, String id, long timeoutMS) throws TimeoutException
-    {
-        WeightedQueue weightedQueue = getWeightedQueue(id);
-
-        try
-        {
-            queueSize.release();
-            try
-            {
-                weightedQueue.put(t, timeoutMS);
-                // the scheduler will release us when a slot is available
-            }
-            catch (TimeoutException | InterruptedException e)
-            {
-                queueSize.acquireUninterruptibly();
-                throw e;
-            }
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException("Interrupted while queueing requests", e);
-        }
-    }
-
-    public void release()
-    {
-        taskCount.release();
-    }
-
-    private void schedule()
-    {
-        queueSize.acquireUninterruptibly();
-        for (Map.Entry<String,WeightedQueue> request : queues.entrySet())
-        {
-            WeightedQueue queue = request.getValue();
-            //Using the weight, process that many requests at a time (for that scheduler id)
-            for (int i=0; i<queue.weight; i++)
-            {
-                Thread t = queue.poll();
-                if (t == null)
-                    break;
-                else
-                {
-                    taskCount.acquireUninterruptibly();
-                    queueSize.acquireUninterruptibly();
-                }
-            }
-        }
-        queueSize.release();
-    }
-
-    /*
-     * Get the Queue for the respective id, if one is not available
-     * create a new queue for that corresponding id and return it
-     */
-    private WeightedQueue getWeightedQueue(String id)
-    {
-        WeightedQueue weightedQueue = queues.get(id);
-        if (weightedQueue != null)
-            // queue existed
-            return weightedQueue;
-
-        WeightedQueue maybenew = new WeightedQueue(id, getWeight(id));
-        weightedQueue = queues.putIfAbsent(id, maybenew);
-        if (weightedQueue == null)
-        {
-            return maybenew;
-        }
-
-        // another thread created the queue
-        return weightedQueue;
-    }
-
-    Semaphore getTaskCount()
-    {
-        return taskCount;
-    }
-
-    private int getWeight(String weightingVar)
-    {
-        return (weights != null && weights.containsKey(weightingVar))
-                ? weights.get(weightingVar)
-                : defaultWeight;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java b/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
deleted file mode 100644
index 76c7e9d..0000000
--- a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.scheduler;
-
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.metrics.LatencyMetrics;
-
-class WeightedQueue
-{
-    private final LatencyMetrics metric;
-
-    public final String key;
-    public final int weight;
-    private final SynchronousQueue<Entry> queue;
-    public WeightedQueue(String key, int weight)
-    {
-        this.key = key;
-        this.weight = weight;
-        this.queue = new SynchronousQueue<Entry>(true);
-        this.metric =  new LatencyMetrics("scheduler", "WeightedQueue", key);
-    }
-
-    public void put(Thread t, long timeoutMS) throws InterruptedException, TimeoutException
-    {
-        if (!queue.offer(new WeightedQueue.Entry(t), timeoutMS, TimeUnit.MILLISECONDS))
-            throw new TimeoutException("Failed to acquire request scheduler slot for '" + key + "'");
-    }
-
-    public Thread poll()
-    {
-        Entry e = queue.poll();
-        if (e == null)
-            return null;
-        metric.addNano(System.nanoTime() - e.creationTime);
-        return e.thread;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "RoundRobinScheduler.WeightedQueue(key=" + key + " weight=" + weight + ")";
-    }
-
-    private final static class Entry
-    {
-        public final long creationTime = System.nanoTime();
-        public final Thread thread;
-        public Entry(Thread thread)
-        {
-            this.thread = thread;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4881d9c3/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 54fa7e2..26489c6 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.cache.*;
 import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -50,7 +49,6 @@ import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -357,58 +355,25 @@ public class CacheService implements CacheServiceMBean
         {
             assert(cfs.metadata.isCounter());
             out.write(cfs.metadata.ksAndCFBytes);
-            ByteBufferUtil.writeWithLength(key.partitionKey, out);
-            ByteBufferUtil.writeWithLength(key.cellName, out);
+            key.write(out);
         }
 
         public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
         {
             //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
             //parameter so they aren't deserialized here, even though they are serialized by this serializer
-            final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
-            final ByteBuffer cellName = ByteBufferUtil.readWithLength(in);
+            final CounterCacheKey cacheKey = CounterCacheKey.read(cfs.metadata.ksAndCFName, in);
             if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled())
                 return null;
-            assert(cfs.metadata.isCounter());
+
             return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
             {
                 public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
                 {
-                    DecoratedKey key = cfs.decorateKey(partitionKey);
-                    LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName);
-                    ColumnDefinition column = name.column;
-                    CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
-
-                    int nowInSec = FBUtilities.nowInSeconds();
-                    ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
-                    if (path == null)
-                        builder.add(column);
-                    else
-                        builder.select(column, path);
-
-                    ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(name.clustering, cfs.metadata.comparator), false);
-                    SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), filter);
-                    try (ReadExecutionController controller = cmd.executionController();
-                         RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, controller), nowInSec))
-                    {
-                        Cell cell;
-                        if (column.isStatic())
-                        {
-                            cell = iter.staticRow().getCell(column);
-                        }
-                        else
-                        {
-                            if (!iter.hasNext())
-                                return null;
-                            cell = iter.next().getCell(column);
-                        }
-
-                        if (cell == null)
-                            return null;
-
-                        ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
-                        return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, name.clustering, column, path), clockAndCount);
-                    }
+                    ByteBuffer value = cacheKey.readCounterValue(cfs);
+                    return value == null
+                         ? null
+                         : Pair.create(cacheKey, CounterContext.instance().getLocalClockAndCount(value));
                 }
             });
         }
@@ -487,7 +452,7 @@ public class CacheService implements CacheServiceMBean
                 // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that
                 // this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this
                 // part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
-                RowIndexEntry.Serializer.skipForCache(input, BigFormat.instance.getLatestVersion());
+                RowIndexEntry.Serializer.skipForCache(input);
                 return null;
             }
             RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata,