You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/01 00:35:38 UTC

[3/4] cassandra git commit: Revert "Revert "Stop accessing the partitioner directly via StorageService""

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 94031ab..9221090 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -42,15 +42,12 @@ import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.LocalByPartionerType;
+import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
-
 import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
@@ -79,10 +76,6 @@ public abstract class SecondaryIndex
      */
     public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
 
-    public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder()
-                                                      ? BytesType.instance
-                                                      : new LocalByPartionerType(StorageService.getPartitioner());
-
     /**
      * Base CF that has many indexes
      */
@@ -303,7 +296,7 @@ public abstract class SecondaryIndex
      */
     public DecoratedKey getIndexKeyFor(ByteBuffer value)
     {
-        return getIndexCfs().partitioner.decorateKey(value);
+        return getIndexCfs().decorateKey(value);
     }
 
     /**
@@ -381,11 +374,20 @@ public abstract class SecondaryIndex
      */
     public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def)
     {
+        return newIndexMetadata(baseMetadata, def, def.type);
+    }
+
+    /**
+     * Create the index metadata for the index on a given column of a given table.
+     */
+    static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def, AbstractType<?> comparator)
+    {
         if (def.getIndexType() == IndexType.CUSTOM)
             return null;
 
         CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
                                                        .withId(baseMetadata.cfId)
+                                                       .withPartitioner(new LocalPartitioner(comparator))
                                                        .addPartitionKey(def.name, def.type);
 
         if (def.getIndexType() == IndexType.COMPOSITES)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 42861c5..29f235c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -101,7 +101,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
         for (ColumnDefinition def : baseMetadata.clusteringColumns())
             indexMetadata.addClusteringColumn(def.name, def.type);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 6529ad9..cd4aff9 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
@@ -48,7 +47,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
 {
     public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
 
         List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
         for (int i = 0; i < columnDef.position(); i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index d322faf..b76bf7e 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -111,7 +111,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                 // *data* for a given partition.
                 BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator());
                 List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
-                DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey);
+                DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey);
 
                 while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 7930bd6..478559a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -23,7 +23,6 @@ import java.util.Set;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -42,7 +41,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
 {
     public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
     {
-        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index bcaf70b..53a9b4a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -84,7 +84,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                 while (next == null && indexHits.hasNext())
                 {
                     Row hit = indexHits.next();
-                    DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0));
+                    DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0));
 
                     SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
                                                                                            baseCfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
deleted file mode 100644
index e02ba3c..0000000
--- a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.marshal;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.serializers.MarshalException;
-
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
- * Not intended for user-defined CFs, and will in fact error out if used with such. */
-public class LocalByPartionerType extends AbstractType<ByteBuffer>
-{
-    private final IPartitioner partitioner;
-
-    public LocalByPartionerType(IPartitioner partitioner)
-    {
-        this.partitioner = partitioner;
-    }
-
-    public static LocalByPartionerType getInstance(TypeParser parser)
-    {
-        return new LocalByPartionerType(StorageService.getPartitioner());
-    }
-
-    @Override
-    public ByteBuffer compose(ByteBuffer bytes)
-    {
-        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
-    }
-
-    @Override
-    public ByteBuffer decompose(ByteBuffer bytes)
-    {
-        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
-    }
-
-    public String getString(ByteBuffer bytes)
-    {
-        return ByteBufferUtil.bytesToHex(bytes);
-    }
-
-    public ByteBuffer fromString(String source)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Term fromJSONObject(Object parsed)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String toJSONString(ByteBuffer buffer, int protocolVersion)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public int compare(ByteBuffer o1, ByteBuffer o2)
-    {
-        // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
-        return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
-    }
-
-    @Override
-    public void validate(ByteBuffer bytes) throws MarshalException
-    {
-        throw new IllegalStateException("You shouldn't be validating this.");
-    }
-
-    public TypeSerializer<ByteBuffer> getSerializer()
-    {
-        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
new file mode 100644
index 0000000..efaea53
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.serializers.MarshalException;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
+ * Not intended for user-defined CFs, and will in fact error out if used with such. */
+public class PartitionerDefinedOrder extends AbstractType<ByteBuffer>
+{
+    private final IPartitioner partitioner;
+
+    public PartitionerDefinedOrder(IPartitioner partitioner)
+    {
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public ByteBuffer compose(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+
+    @Override
+    public ByteBuffer decompose(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+
+    public String getString(ByteBuffer bytes)
+    {
+        return ByteBufferUtil.bytesToHex(bytes);
+    }
+
+    public ByteBuffer fromString(String source)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Term fromJSONObject(Object parsed)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toJSONString(ByteBuffer buffer, int protocolVersion)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
+        return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
+    }
+
+    @Override
+    public void validate(ByteBuffer bytes) throws MarshalException
+    {
+        throw new IllegalStateException("You shouldn't be validating this.");
+    }
+
+    public TypeSerializer<ByteBuffer> getSerializer()
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index e8ec4c0..25d1887 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -42,7 +43,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Locks;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.HeapAllocator;
-import org.apache.cassandra.service.StorageService;
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
 import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
 
@@ -59,7 +59,7 @@ public class AtomicBTreePartition implements Partition
     private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class);
 
     public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
-                                                                                       StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
+                                                                                       DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
                                                                                        null));
 
     // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 102008f..f2e0617 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -102,6 +102,17 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true);
     }
 
+    public PartitionUpdate(CFMetaData metadata,
+                           ByteBuffer key,
+                           PartitionColumns columns,
+                           int initialRowCapacity)
+    {
+        this(metadata,
+             metadata.decorateKey(key),
+             columns,
+             initialRowCapacity);
+    }
+
     /**
      * Creates a empty immutable partition update.
      *
@@ -134,7 +145,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
      * Creates an immutable partition update that contains a single row update.
      *
      * @param metadata the metadata for the created update.
-     * @param key the partition key for the partition that the created update should delete.
+     * @param key the partition key for the partition to update.
      * @param row the row for the update.
      *
      * @return the newly created partition update containing only {@code row}.
@@ -147,6 +158,20 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     }
 
     /**
+     * Creates an immutable partition update that contains a single row update.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the partition to update.
+     * @param row the row for the update.
+     *
+     * @return the newly created partition update containing only {@code row}.
+     */
+    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, ByteBuffer key, Row row)
+    {
+        return singleRowUpdate(metadata, metadata.decorateKey(key), row);
+    }
+
+    /**
      * Turns the given iterator into an update.
      *
      * Warning: this method does not close the provided iterator, it is up to
@@ -262,6 +287,21 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     }
 
     /**
+     * Creates a partition update that entirely deletes a given partition.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the partition that the created update should delete.
+     * @param timestamp the timestamp for the deletion.
+     * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion.
+     *
+     * @return the newly created partition deletion update.
+     */
+    public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, ByteBuffer key, long timestamp, int nowInSec)
+    {
+        return fullPartitionDelete(metadata, metadata.decorateKey(key), timestamp, nowInSec);
+    }
+
+    /**
      * Merges the provided updates, yielding a new update that incorporates all those updates.
      *
      * @param updates the collection of updates to merge. This shouldn't be empty.
@@ -695,29 +735,39 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
             }
         }
 
-        public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+        public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
+            if (version >= MessagingService.VERSION_30)
+            {
+                assert key == null; // key is only there for the old format
+                return deserialize30(in, version, flag);
+            }
+            else
             {
                 assert key != null;
-
-                // This is only used in mutation, and mutation have never allowed "null" column families
-                boolean present = in.readBoolean();
-                assert present;
-
-                CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-                LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
-                int size = in.readInt();
-                Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
-                SerializationHelper helper = new SerializationHelper(metadata, version, flag);
-                try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper))
-                {
-                    return PartitionUpdate.fromIterator(iterator);
-                }
+                CFMetaData metadata = deserializeMetadata(in, version);
+                DecoratedKey dk = metadata.decorateKey(key);
+                return deserializePre30(in, version, flag, metadata, dk);
             }
+        }
 
-            assert key == null; // key is only there for the old format
+        // Used to share same decorated key between updates.
+        public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+        {
+            if (version >= MessagingService.VERSION_30)
+            {
+                return deserialize30(in, version, flag);
+            }
+            else
+            {
+                assert key != null;
+                CFMetaData metadata = deserializeMetadata(in, version);
+                return deserializePre30(in, version, flag, metadata, key);
+            }
+        }
 
+        private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+        {
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
             UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
             if (header.isEmpty)
@@ -752,6 +802,28 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                                        false);
         }
 
+        private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException
+        {
+            // This is only used in mutation, and mutation have never allowed "null" column families
+            boolean present = in.readBoolean();
+            assert present;
+
+            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+            return metadata;
+        }
+
+        private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException
+        {
+            LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
+            int size = in.readInt();
+            Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
+            SerializationHelper helper = new SerializationHelper(metadata, version, flag);
+            try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper))
+            {
+                return PartitionUpdate.fromIterator(iterator);
+            }
+        }
+
         public long serializedSize(PartitionUpdate update, int version)
         {
             if (version < MessagingService.VERSION_30)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index b96e0b1..531bd26 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -170,7 +169,7 @@ public class UnfilteredRowIteratorSerializer
 
     public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithVIntLength(in));
+        DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
         int flags = in.readUnsignedByte();
         boolean isReversed = (flags & IS_REVERSED) != 0;
         if ((flags & IS_EMPTY) != 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 082c71d..f36abae 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -303,9 +303,10 @@ public class MaterializedView
             partitionKey[i] = value;
         }
 
-        return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
-                                                                                     .getKeyValidatorAsClusteringComparator()
-                                                                                     .make(partitionKey)));
+        CFMetaData metadata = getViewCfs().metadata;
+        return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
+                                                                     .getKeyValidatorAsClusteringComparator()
+                                                                     .make(partitionKey)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 53e4e91..d0ba5ea 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -377,7 +377,7 @@ public class TemporalRow
             this.baseCfs = baseCfs;
             this.viewPrimaryKey = viewPrimaryKey;
             this.key = key;
-            this.dk = baseCfs.partitioner.decorateKey(key);
+            this.dk = baseCfs.decorateKey(key);
             this.clusteringToRow = new HashMap<>();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 2cb7f61..31fda34 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -164,7 +164,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
         // if user specified tokens, use those
         if (initialTokens.size() > 0)
             return getSpecifiedTokens(metadata, initialTokens);
-        
+
         int numTokens = DatabaseDescriptor.getNumTokens();
         if (numTokens < 1)
             throw new ConfigurationException("num_tokens must be >= 1");
@@ -179,13 +179,13 @@ public class BootStrapper extends ProgressEventNotifierSupport
     }
 
     private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata,
-                                                                Collection<String> initialTokens)
+                                                        Collection<String> initialTokens)
     {
         logger.debug("tokens manually specified as {}",  initialTokens);
         List<Token> tokens = new ArrayList<>(initialTokens.size());
         for (String tokenString : initialTokens)
         {
-            Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
+            Token token = metadata.partitioner.getTokenFactory().fromString(tokenString);
             if (metadata.getEndpoint(token) != null)
                 throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first).");
             tokens.add(token);
@@ -202,8 +202,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
         if (ks == null)
             throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace);
         AbstractReplicationStrategy rs = ks.getReplicationStrategy();
-        
-        return TokenAllocation.allocateTokens(metadata, rs, StorageService.getPartitioner(), address, numTokens);
+
+        return TokenAllocation.allocateTokens(metadata, rs, address, numTokens);
     }
 
     public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
@@ -211,7 +211,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
         Set<Token> tokens = new HashSet<>(numTokens);
         while (tokens.size() < numTokens)
         {
-            Token token = StorageService.getPartitioner().getRandomToken();
+            Token token = metadata.partitioner.getRandomToken();
             if (metadata.getEndpoint(token) == null)
                 tokens.add(token);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index d7139d0..46872c1 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -300,4 +300,9 @@ public class ByteOrderedPartitioner implements IPartitioner
     {
         return BytesType.instance;
     }
+
+    public AbstractType<?> partitionOrdering()
+    {
+        return BytesType.instance;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index b22da66..e0a08dc 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -78,4 +78,10 @@ public interface IPartitioner
     public Map<Token, Float> describeOwnership(List<Token> sortedTokens);
 
     public AbstractType<?> getTokenValidator();
+
+    /**
+     * Abstract type that orders the same way as DecoratedKeys provided by this partitioner.
+     * Used by secondary indices.
+     */
+    public AbstractType<?> partitionOrdering();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 01dc75e..2a5a16e 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -84,6 +84,11 @@ public class LocalPartitioner implements IPartitioner
         return comparator;
     }
 
+    public AbstractType<?> partitionOrdering()
+    {
+        return comparator;
+    }
+
     public class LocalToken extends ComparableObjectToken<ByteBuffer>
     {
         static final long serialVersionUID = 8437543776403014875L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 003879c..d68be3f 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PreHashedDecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,6 +46,7 @@ public class Murmur3Partitioner implements IPartitioner
     private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(MINIMUM);
 
     public static final Murmur3Partitioner instance = new Murmur3Partitioner();
+    public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
     public DecoratedKey decorateKey(ByteBuffer key)
     {
@@ -288,4 +290,9 @@ public class Murmur3Partitioner implements IPartitioner
     {
         return LongType.instance;
     }
+
+    public AbstractType<?> partitionOrdering()
+    {
+        return partitionOrdering;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index ae0326f..45c2cfa 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -240,4 +240,9 @@ public class OrderPreservingPartitioner implements IPartitioner
     {
         return UTF8Type.instance;
     }
+
+    public AbstractType<?> partitionOrdering()
+    {
+        return UTF8Type.instance;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 71a0a99..b0dea01 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.GuidGenerator;
@@ -47,6 +48,7 @@ public class RandomPartitioner implements IPartitioner
     private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(FBUtilities.hashToBigInteger(ByteBuffer.allocate(1))));
 
     public static final RandomPartitioner instance = new RandomPartitioner();
+    public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
     public DecoratedKey decorateKey(ByteBuffer key)
     {
@@ -196,4 +198,9 @@ public class RandomPartitioner implements IPartitioner
     {
         return IntegerType.instance;
     }
+
+    public AbstractType<?> partitionOrdering()
+    {
+        return partitionOrdering;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 68c8a11..e7624c3 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -333,7 +333,7 @@ public class RangeStreamer
             Collection<Range<Token>> ranges = entry.getValue().getValue();
 
             // filter out already streamed ranges
-            Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner());
+            Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
             if (ranges.removeAll(availableRanges))
             {
                 logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index dd3a02b..b4281ce 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -33,7 +33,6 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -49,12 +48,11 @@ public class TokenAllocation
 
     public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
                                                    final AbstractReplicationStrategy rs,
-                                                   final IPartitioner partitioner,
                                                    final InetAddress endpoint,
                                                    int numTokens)
     {
         StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint);
-        Collection<Token> tokens = create(tokenMetadata, strategy, partitioner).addUnit(endpoint, numTokens);
+        Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
         tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens);
 
         if (logger.isWarnEnabled())
@@ -141,7 +139,7 @@ public class TokenAllocation
         return stat;
     }
 
-    static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy, IPartitioner partitioner)
+    static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy)
     {
         NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>();
         for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
@@ -149,7 +147,7 @@ public class TokenAllocation
             if (strategy.inAllocationRing(en.getValue()))
                 sortedTokens.put(en.getKey(), en.getValue());
         }
-        return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner);
+        return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, tokenMetadata.partitioner);
     }
 
     interface StrategyAdapter extends ReplicationStrategy<InetAddress>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e61a35a..5fa402a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -584,7 +584,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 JVMStabilityInspector.inspectThrowable(th);
                 // TODO this is broken
                 logger.warn("Unable to calculate tokens for {}.  Will use a random one", address);
-                tokens = Collections.singletonList(StorageService.getPartitioner().getRandomToken());
+                tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken());
             }
             int generation = epState.getHeartBeatState().getGeneration();
             int heartbeat = epState.getHeartBeatState().getHeartBeatVersion();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index acc9141..f4b4da8 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Closeable;
+import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -30,7 +31,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
@@ -46,12 +46,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
     protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat();
     protected static AtomicInteger generation = new AtomicInteger(0);
 
-    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
+    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
     {
         this.metadata = metadata;
         this.directory = directory;
         this.columns = columns;
-        DatabaseDescriptor.setPartitioner(partitioner);
     }
 
     protected void setSSTableFormatType(SSTableFormat.Type type)
@@ -103,6 +102,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
         return maxGen;
     }
 
+    PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
+    {
+        return getUpdateFor(metadata.decorateKey(key));
+    }
+
     /**
      * Returns a PartitionUpdate suitable to write on this writer for the provided key.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 43e214b..7ae5651 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -77,6 +77,9 @@ public class CQLSSTableWriter implements Closeable
     static
     {
         Config.setClientMode(true);
+        // Partitioner is not set in client mode.
+        if (DatabaseDescriptor.getPartitioner() == null)
+            DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
     }
 
     private final AbstractSSTableSimpleWriter writer;
@@ -219,10 +222,7 @@ public class CQLSSTableWriter implements Closeable
         try
         {
             for (ByteBuffer key : keys)
-            {
-                DecoratedKey dk = DatabaseDescriptor.getPartitioner().decorateKey(key);
-                insert.addUpdateForKey(writer.getUpdateFor(dk), clustering, params);
-            }
+                insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
             return this;
         }
         catch (SSTableSimpleUnsortedWriter.SyncException e)
@@ -277,7 +277,6 @@ public class CQLSSTableWriter implements Closeable
     public static class Builder
     {
         private File directory;
-        private IPartitioner partitioner = Murmur3Partitioner.instance;
 
         protected SSTableFormat.Type formatType = null;
 
@@ -402,7 +401,7 @@ public class CQLSSTableWriter implements Closeable
          */
         public Builder withPartitioner(IPartitioner partitioner)
         {
-            this.partitioner = partitioner;
+            this.schema = schema.copy(partitioner);
             return this;
         }
 
@@ -511,8 +510,8 @@ public class CQLSSTableWriter implements Closeable
                 throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
 
             AbstractSSTableSimpleWriter writer = sorted
-                                               ? new SSTableSimpleWriter(directory, schema, partitioner, insert.updatedColumns())
-                                               : new SSTableSimpleUnsortedWriter(directory, schema, partitioner, insert.updatedColumns(), bufferSizeInMB);
+                                               ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns())
+                                               : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB);
 
             if (formatType != null)
                 writer.setSSTableFormatType(formatType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 5de2452..8fb300b 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 
 import com.google.common.collect.AbstractIterator;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -80,11 +82,13 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
     }
 
     private final In in;
+    private final IPartitioner partitioner;
 
 
-    public KeyIterator(Descriptor desc)
+    public KeyIterator(Descriptor desc, CFMetaData metadata)
     {
         in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
+        partitioner = metadata.partitioner;
     }
 
     protected DecoratedKey computeNext()
@@ -94,7 +98,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
             if (in.isEOF())
                 return endOfData();
 
-            DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
+            DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
             RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the entry
             return key;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index f1e01f2..bbc56cc 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -40,7 +40,7 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
     {
         iters = new ArrayList<>(sstables.size());
         for (SSTableReader sstable : sstables)
-            iters.add(new KeyIterator(sstable.descriptor));
+            iters.add(new KeyIterator(sstable.descriptor, sstable.metadata));
     }
 
     private void maybeInit()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 516534d..b86d9b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -63,31 +64,29 @@ public abstract class SSTable
     public final Descriptor descriptor;
     protected final Set<Component> components;
     public final CFMetaData metadata;
-    public final IPartitioner partitioner;
     public final boolean compression;
 
     public DecoratedKey first;
     public DecoratedKey last;
 
-    protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
+    protected SSTable(Descriptor descriptor, CFMetaData metadata)
     {
-        this(descriptor, new HashSet<>(), metadata, partitioner);
+        this(descriptor, new HashSet<>(), metadata);
     }
 
-    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
+    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata)
     {
         // In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
         // full schema definition. SSTableLoader use that ability
         assert descriptor != null;
         assert components != null;
-        assert partitioner != null;
+        assert metadata != null;
 
         this.descriptor = descriptor;
         Set<Component> dataComponents = new HashSet<>(components);
         this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
         this.components = new CopyOnWriteArraySet<>(dataComponents);
         this.metadata = metadata;
-        this.partitioner = partitioner;
     }
 
     /**
@@ -121,6 +120,16 @@ public abstract class SSTable
         return true;
     }
 
+    public IPartitioner getPartitioner()
+    {
+        return metadata.partitioner;
+    }
+
+    public DecoratedKey decorateKey(ByteBuffer key)
+    {
+        return getPartitioner().decorateKey(key);
+    }
+
     /**
      * If the given @param key occupies only part of a larger buffer, allocate a new buffer that is only
      * as large as necessary.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f25d3ff..20c3962 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -133,7 +133,7 @@ public class SSTableLoader implements StreamEventHandler
                     // To conserve memory, open SSTableReaders without bloom filters and discard
                     // the index summary after calculating the file sections to stream and the estimated
                     // number of keys for each endpoint. See CASSANDRA-5555 for details.
-                    SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata, client.getPartitioner());
+                    SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata);
                     sstables.add(sstable);
 
                     // calculate the sstable sections to stream as well as the estimated number of
@@ -252,7 +252,6 @@ public class SSTableLoader implements StreamEventHandler
     public static abstract class Client
     {
         private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
-        private IPartitioner partitioner;
 
         /**
          * Initialize the client.
@@ -299,23 +298,6 @@ public class SSTableLoader implements StreamEventHandler
             return endpointToRanges;
         }
 
-        protected void setPartitioner(String partclass) throws ConfigurationException
-        {
-            setPartitioner(FBUtilities.newPartitioner(partclass));
-        }
-
-        protected void setPartitioner(IPartitioner partitioner)
-        {
-            this.partitioner = partitioner;
-            // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner
-            DatabaseDescriptor.setPartitioner(partitioner);
-        }
-
-        public IPartitioner getPartitioner()
-        {
-            return partitioner;
-        }
-
         protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint)
         {
             Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index a70b92f..f4b9adf 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -60,9 +59,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
-    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
+    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
     {
-        super(directory, metadata, partitioner, columns);
+        super(directory, metadata, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
         this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
         diskWriter.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index b22a048..45722cd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -44,9 +44,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
+    protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
     {
-        super(directory, metadata, partitioner, columns);
+        super(directory, metadata, columns);
     }
 
     private SSTableTxnWriter getOrCreateWriter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 6d39d2d..5ceced5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -27,10 +27,11 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
@@ -42,27 +43,23 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.lifecycle.TransactionLogs;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
 import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 
@@ -351,21 +348,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
     {
-        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
-                ? new LocalPartitioner(metadata.getKeyValidator())
-                : StorageService.getPartitioner();
-        return open(desc, componentsFor(desc), metadata, p);
+        return open(desc, componentsFor(desc), metadata);
     }
 
-    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
     {
-        return open(descriptor, components, metadata, partitioner, true, true);
+        return open(descriptor, components, metadata, true, true);
     }
 
     // use only for offline or "Standalone" operations
     public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs) throws IOException
     {
-        return open(descriptor, components, cfs.metadata, cfs.partitioner, false, false); // do not track hotness
+        return open(descriptor, components, cfs.metadata, false, false); // do not track hotness
     }
 
     /**
@@ -374,11 +368,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * @param descriptor
      * @param components
      * @param metadata
-     * @param partitioner
      * @return opened SSTableReader
      * @throws IOException
      */
-    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
     {
         // Minimum components without which we can't do anything
         assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
@@ -394,7 +387,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable or no stats available.
         // In that case, we skip the check.
-        String partitionerName = partitioner.getClass().getCanonicalName();
+        String partitionerName = metadata.partitioner.getClass().getCanonicalName();
         if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
         {
             logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -406,7 +399,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         SSTableReader sstable = internalOpen(descriptor,
                                              components,
                                              metadata,
-                                             partitioner,
                                              System.currentTimeMillis(),
                                              statsMetadata,
                                              OpenReason.NORMAL,
@@ -431,7 +423,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public static SSTableReader open(Descriptor descriptor,
                                       Set<Component> components,
                                       CFMetaData metadata,
-                                      IPartitioner partitioner,
                                       boolean validate,
                                       boolean trackHotness) throws IOException
     {
@@ -452,7 +443,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable or no stats available.
         // In that case, we skip the check.
-        String partitionerName = partitioner.getClass().getCanonicalName();
+        String partitionerName = metadata.partitioner.getClass().getCanonicalName();
         if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
         {
             logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -464,7 +455,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         SSTableReader sstable = internalOpen(descriptor,
                                              components,
                                              metadata,
-                                             partitioner,
                                              System.currentTimeMillis(),
                                              statsMetadata,
                                              OpenReason.NORMAL,
@@ -502,8 +492,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     }
 
     public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
-                                                    final CFMetaData metadata,
-                                                    final IPartitioner partitioner)
+                                                    final CFMetaData metadata)
     {
         final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 
@@ -517,7 +506,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     SSTableReader sstable;
                     try
                     {
-                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
+                        sstable = open(entry.getKey(), entry.getValue(), metadata);
                     }
                     catch (CorruptSSTableException ex)
                     {
@@ -562,7 +551,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public static SSTableReader internalOpen(Descriptor desc,
                                       Set<Component> components,
                                       CFMetaData metadata,
-                                      IPartitioner partitioner,
                                       SegmentedFile ifile,
                                       SegmentedFile dfile,
                                       IndexSummary isummary,
@@ -572,9 +560,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                       OpenReason openReason,
                                       SerializationHeader header)
     {
-        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+        assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 
-        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+        SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
 
         reader.bf = bf;
         reader.ifile = ifile;
@@ -589,7 +577,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     private static SSTableReader internalOpen(final Descriptor descriptor,
                                             Set<Component> components,
                                             CFMetaData metadata,
-                                            IPartitioner partitioner,
                                             Long maxDataAge,
                                             StatsMetadata sstableMetadata,
                                             OpenReason openReason,
@@ -597,19 +584,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     {
         Factory readerFactory = descriptor.getFormat().getReaderFactory();
 
-        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+        return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
     }
 
     protected SSTableReader(final Descriptor desc,
                             Set<Component> components,
                             CFMetaData metadata,
-                            IPartitioner partitioner,
                             long maxDataAge,
                             StatsMetadata sstableMetadata,
                             OpenReason openReason,
                             SerializationHeader header)
     {
-        super(desc, components, metadata, partitioner);
+        super(desc, components, metadata);
         this.sstableMetadata = sstableMetadata;
         this.header = header;
         this.maxDataAge = maxDataAge;
@@ -814,7 +800,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 {
                     ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
                     RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex);
-                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
+                    DecoratedKey decoratedKey = decorateKey(key);
                     if (first == null)
                         first = decoratedKey;
                     last = decoratedKey;
@@ -832,7 +818,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 }
 
                 if (!summaryLoaded)
-                    indexSummary = summaryBuilder.build(partitioner);
+                    indexSummary = summaryBuilder.build(getPartitioner());
             }
         }
 
@@ -862,10 +848,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
             indexSummary = IndexSummary.serializer.deserialize(
-                    iStream, partitioner, descriptor.version.hasSamplingLevel(),
+                    iStream, getPartitioner(), descriptor.version.hasSamplingLevel(),
                     metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
-            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
-            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+            first = decorateKey(ByteBufferUtil.readWithLength(iStream));
+            last = decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
             dbuilder.deserializeBounds(iStream);
         }
@@ -1064,7 +1050,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         SSTableReader replacement = internalOpen(descriptor,
                                                  components,
                                                  metadata,
-                                                 partitioner,
                                                  ifile != null ? ifile.sharedCopy() : null,
                                                  dfile.sharedCopy(),
                                                  newSummary,
@@ -1168,7 +1153,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             else if (samplingLevel < indexSummary.getSamplingLevel())
             {
                 // we can use the existing index summary to make a smaller one
-                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
 
                 try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
                     SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
@@ -1203,11 +1188,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 long indexPosition;
                 while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
                 {
-                    summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+                    summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
                     RowIndexEntry.Serializer.skip(primaryIndex);
                 }
 
-                return summaryBuilder.build(partitioner);
+                return summaryBuilder.build(getPartitioner());
             }
         }
         finally
@@ -1304,8 +1289,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
         CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 
-        //We need the parent cf metadata
-        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
+        // We need the parent cf metadata
+        String cfName = metadata.isIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
         cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 
         return cmd;
@@ -1477,7 +1462,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     public DecoratedKey next()
                     {
                         byte[] bytes = indexSummary.getKey(idx++);
-                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
+                        return decorateKey(ByteBuffer.wrap(bytes));
                     }
 
                     public void remove()
@@ -1619,7 +1604,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 while (!in.isEOF())
                 {
                     ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
-                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+                    DecoratedKey indexDecoratedKey = decorateKey(indexKey);
                     if (indexDecoratedKey.compareTo(token) > 0)
                         return indexDecoratedKey;
 
@@ -2300,7 +2285,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         public abstract SSTableReader open(final Descriptor descriptor,
                                            Set<Component> components,
                                            CFMetaData metadata,
-                                           IPartitioner partitioner,
                                            Long maxDataAge,
                                            StatsMetadata sstableMetadata,
                                            OpenReason openReason,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 08a9dcc..fa691b8 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -27,13 +27,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -74,11 +72,10 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                             long keyCount, 
                             long repairedAt, 
                             CFMetaData metadata, 
-                            IPartitioner partitioner, 
                             MetadataCollector metadataCollector, 
                             SerializationHeader header)
     {
-        super(descriptor, components(metadata), metadata, partitioner);
+        super(descriptor, components(metadata), metadata);
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
         this.metadataCollector = metadataCollector;
@@ -90,19 +87,18 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        Long keyCount,
                                        Long repairedAt,
                                        CFMetaData metadata,
-                                       IPartitioner partitioner,
                                        MetadataCollector metadataCollector,
                                        SerializationHeader header,
                                        LifecycleTransaction txn)
     {
         Factory writerFactory = descriptor.getFormat().getWriterFactory();
-        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
+        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
     }
 
     public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
     {
         CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
-        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header, txn);
+        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn);
     }
 
     public static SSTableWriter create(CFMetaData metadata,
@@ -110,12 +106,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        long keyCount,
                                        long repairedAt,
                                        int sstableLevel,
-                                       IPartitioner partitioner,
                                        SerializationHeader header,
                                        LifecycleTransaction txn)
     {
         MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-        return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header, txn);
+        return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
     }
 
     public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn)
@@ -255,7 +250,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
 
     protected Map<MetadataType, MetadataComponent> finalizeMetadata()
     {
-        return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+        return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
                                                   metadata.getBloomFilterFpChance(),
                                                   repairedAt,
                                                   header);
@@ -287,7 +282,6 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                            long keyCount,
                                            long repairedAt,
                                            CFMetaData metadata,
-                                           IPartitioner partitioner,
                                            MetadataCollector metadataCollector,
                                            SerializationHeader header,
                                            LifecycleTransaction txn);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index a072d4d..860cd9f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -23,7 +23,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -86,21 +85,20 @@ public class BigFormat implements SSTableFormat
                                   long keyCount, 
                                   long repairedAt, 
                                   CFMetaData metadata, 
-                                  IPartitioner partitioner, 
                                   MetadataCollector metadataCollector, 
                                   SerializationHeader header,
                                   LifecycleTransaction txn)
         {
-            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
+            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
         }
     }
 
     static class ReaderFactory extends SSTableReader.Factory
     {
         @Override
-        public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
+        public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
         {
-            return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+            return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index b539c79..87608fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -52,9 +52,9 @@ public class BigTableReader extends SSTableReader
 {
     private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 
-    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
+    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
     {
-        super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+        super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
     }
 
     public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
@@ -201,7 +201,7 @@ public class BigTableReader extends SSTableReader
                     }
                     else
                     {
-                        DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+                        DecoratedKey indexDecoratedKey = decorateKey(indexKey);
                         int comparison = indexDecoratedKey.compareTo(key);
                         int v = op.apply(comparison);
                         opSatisfied = (v == 0);
@@ -227,7 +227,7 @@ public class BigTableReader extends SSTableReader
                                 // expensive sanity check!  see CASSANDRA-4687
                                 try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
                                 {
-                                    DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+                                    DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
                                     if (!keyInDisk.equals(key))
                                         throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
                                 }