You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2015/07/31 19:25:10 UTC

[3/4] cassandra git commit: Revert "Stop accessing the partitioner directly via StorageService"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 9221090..94031ab 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -42,12 +42,15 @@ import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LocalByPartionerType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+
 import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
@@ -76,6 +79,10 @@ public abstract class SecondaryIndex
      */
     public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
 
+    public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder()
+                                                      ? BytesType.instance
+                                                      : new LocalByPartionerType(StorageService.getPartitioner());
+
     /**
      * Base CF that has many indexes
      */
@@ -296,7 +303,7 @@ public abstract class SecondaryIndex
      */
     public DecoratedKey getIndexKeyFor(ByteBuffer value)
     {
-        return getIndexCfs().decorateKey(value);
+        return getIndexCfs().partitioner.decorateKey(value);
     }
 
     /**
@@ -374,20 +381,11 @@ public abstract class SecondaryIndex
      */
     public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def)
     {
-        return newIndexMetadata(baseMetadata, def, def.type);
-    }
-
-    /**
-     * Create the index metadata for the index on a given column of a given table.
-     */
-    static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def, AbstractType<?> comparator)
-    {
         if (def.getIndexType() == IndexType.CUSTOM)
             return null;
 
         CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
                                                        .withId(baseMetadata.cfId)
-                                                       .withPartitioner(new LocalPartitioner(comparator))
                                                        .addPartitionKey(def.name, def.type);
 
         if (def.getIndexType() == IndexType.COMPOSITES)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 29f235c..42861c5 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -101,7 +101,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
+        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
         for (ColumnDefinition def : baseMetadata.clusteringColumns())
             indexMetadata.addClusteringColumn(def.name, def.type);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index cd4aff9..6529ad9 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
@@ -47,7 +48,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
 {
     public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
+        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
 
         List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
         for (int i = 0; i < columnDef.position(); i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index b76bf7e..d322faf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -111,7 +111,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                 // *data* for a given partition.
                 BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator());
                 List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
-                DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey);
+                DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey);
 
                 while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 478559a..7930bd6 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -41,7 +42,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
 {
     public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
     {
-        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
+        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 53a9b4a..bcaf70b 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -84,7 +84,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                 while (next == null && indexHits.hasNext())
                 {
                     Row hit = indexHits.next();
-                    DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0));
+                    DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0));
 
                     SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
                                                                                            baseCfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
new file mode 100644
index 0000000..e02ba3c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.serializers.MarshalException;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
+ * Not intended for user-defined CFs, and will in fact error out if used with such. */
+public class LocalByPartionerType extends AbstractType<ByteBuffer>
+{
+    private final IPartitioner partitioner;
+
+    public LocalByPartionerType(IPartitioner partitioner)
+    {
+        this.partitioner = partitioner;
+    }
+
+    public static LocalByPartionerType getInstance(TypeParser parser)
+    {
+        return new LocalByPartionerType(StorageService.getPartitioner());
+    }
+
+    @Override
+    public ByteBuffer compose(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+
+    @Override
+    public ByteBuffer decompose(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+
+    public String getString(ByteBuffer bytes)
+    {
+        return ByteBufferUtil.bytesToHex(bytes);
+    }
+
+    public ByteBuffer fromString(String source)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Term fromJSONObject(Object parsed)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toJSONString(ByteBuffer buffer, int protocolVersion)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
+        return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
+    }
+
+    @Override
+    public void validate(ByteBuffer bytes) throws MarshalException
+    {
+        throw new IllegalStateException("You shouldn't be validating this.");
+    }
+
+    public TypeSerializer<ByteBuffer> getSerializer()
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
deleted file mode 100644
index efaea53..0000000
--- a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.marshal;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.serializers.MarshalException;
-
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
- * Not intended for user-defined CFs, and will in fact error out if used with such. */
-public class PartitionerDefinedOrder extends AbstractType<ByteBuffer>
-{
-    private final IPartitioner partitioner;
-
-    public PartitionerDefinedOrder(IPartitioner partitioner)
-    {
-        this.partitioner = partitioner;
-    }
-
-    @Override
-    public ByteBuffer compose(ByteBuffer bytes)
-    {
-        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
-    }
-
-    @Override
-    public ByteBuffer decompose(ByteBuffer bytes)
-    {
-        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
-    }
-
-    public String getString(ByteBuffer bytes)
-    {
-        return ByteBufferUtil.bytesToHex(bytes);
-    }
-
-    public ByteBuffer fromString(String source)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Term fromJSONObject(Object parsed)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String toJSONString(ByteBuffer buffer, int protocolVersion)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public int compare(ByteBuffer o1, ByteBuffer o2)
-    {
-        // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
-        return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
-    }
-
-    @Override
-    public void validate(ByteBuffer bytes) throws MarshalException
-    {
-        throw new IllegalStateException("You shouldn't be validating this.");
-    }
-
-    public TypeSerializer<ByteBuffer> getSerializer()
-    {
-        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 25d1887..e8ec4c0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -43,6 +42,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Locks;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.HeapAllocator;
+import org.apache.cassandra.service.StorageService;
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
 import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
 
@@ -59,7 +59,7 @@ public class AtomicBTreePartition implements Partition
     private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class);
 
     public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
-                                                                                       DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
+                                                                                       StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
                                                                                        null));
 
     // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f2e0617..102008f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -102,17 +102,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true);
     }
 
-    public PartitionUpdate(CFMetaData metadata,
-                           ByteBuffer key,
-                           PartitionColumns columns,
-                           int initialRowCapacity)
-    {
-        this(metadata,
-             metadata.decorateKey(key),
-             columns,
-             initialRowCapacity);
-    }
-
     /**
      * Creates a empty immutable partition update.
      *
@@ -145,7 +134,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
      * Creates an immutable partition update that contains a single row update.
      *
      * @param metadata the metadata for the created update.
-     * @param key the partition key for the partition to update.
+     * @param key the partition key for the partition that the created update should delete.
      * @param row the row for the update.
      *
      * @return the newly created partition update containing only {@code row}.
@@ -158,20 +147,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     }
 
     /**
-     * Creates an immutable partition update that contains a single row update.
-     *
-     * @param metadata the metadata for the created update.
-     * @param key the partition key for the partition to update.
-     * @param row the row for the update.
-     *
-     * @return the newly created partition update containing only {@code row}.
-     */
-    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, ByteBuffer key, Row row)
-    {
-        return singleRowUpdate(metadata, metadata.decorateKey(key), row);
-    }
-
-    /**
      * Turns the given iterator into an update.
      *
      * Warning: this method does not close the provided iterator, it is up to
@@ -287,21 +262,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     }
 
     /**
-     * Creates a partition update that entirely deletes a given partition.
-     *
-     * @param metadata the metadata for the created update.
-     * @param key the partition key for the partition that the created update should delete.
-     * @param timestamp the timestamp for the deletion.
-     * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion.
-     *
-     * @return the newly created partition deletion update.
-     */
-    public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, ByteBuffer key, long timestamp, int nowInSec)
-    {
-        return fullPartitionDelete(metadata, metadata.decorateKey(key), timestamp, nowInSec);
-    }
-
-    /**
      * Merges the provided updates, yielding a new update that incorporates all those updates.
      *
      * @param updates the collection of updates to merge. This shouldn't be empty.
@@ -735,39 +695,29 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
             }
         }
 
-        public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
-        {
-            if (version >= MessagingService.VERSION_30)
-            {
-                assert key == null; // key is only there for the old format
-                return deserialize30(in, version, flag);
-            }
-            else
-            {
-                assert key != null;
-                CFMetaData metadata = deserializeMetadata(in, version);
-                DecoratedKey dk = metadata.decorateKey(key);
-                return deserializePre30(in, version, flag, metadata, dk);
-            }
-        }
-
-        // Used to share same decorated key between updates.
         public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
         {
-            if (version >= MessagingService.VERSION_30)
-            {
-                return deserialize30(in, version, flag);
-            }
-            else
+            if (version < MessagingService.VERSION_30)
             {
                 assert key != null;
-                CFMetaData metadata = deserializeMetadata(in, version);
-                return deserializePre30(in, version, flag, metadata, key);
+
+                // This is only used in mutation, and mutation have never allowed "null" column families
+                boolean present = in.readBoolean();
+                assert present;
+
+                CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+                LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
+                int size = in.readInt();
+                Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
+                SerializationHelper helper = new SerializationHelper(metadata, version, flag);
+                try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper))
+                {
+                    return PartitionUpdate.fromIterator(iterator);
+                }
             }
-        }
 
-        private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
-        {
+            assert key == null; // key is only there for the old format
+
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
             UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
             if (header.isEmpty)
@@ -802,28 +752,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                                        false);
         }
 
-        private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException
-        {
-            // This is only used in mutation, and mutation have never allowed "null" column families
-            boolean present = in.readBoolean();
-            assert present;
-
-            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-            return metadata;
-        }
-
-        private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException
-        {
-            LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
-            int size = in.readInt();
-            Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
-            SerializationHelper helper = new SerializationHelper(metadata, version, flag);
-            try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper))
-            {
-                return PartitionUpdate.fromIterator(iterator);
-            }
-        }
-
         public long serializedSize(PartitionUpdate update, int version)
         {
             if (version < MessagingService.VERSION_30)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 531bd26..b96e0b1 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -169,7 +170,7 @@ public class UnfilteredRowIteratorSerializer
 
     public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
     {
-        DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithVIntLength(in));
         int flags = in.readUnsignedByte();
         boolean isReversed = (flags & IS_REVERSED) != 0;
         if ((flags & IS_EMPTY) != 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index f36abae..082c71d 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -303,10 +303,9 @@ public class MaterializedView
             partitionKey[i] = value;
         }
 
-        CFMetaData metadata = getViewCfs().metadata;
-        return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
-                                                                     .getKeyValidatorAsClusteringComparator()
-                                                                     .make(partitionKey)));
+        return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
+                                                                                     .getKeyValidatorAsClusteringComparator()
+                                                                                     .make(partitionKey)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index d0ba5ea..53e4e91 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -377,7 +377,7 @@ public class TemporalRow
             this.baseCfs = baseCfs;
             this.viewPrimaryKey = viewPrimaryKey;
             this.key = key;
-            this.dk = baseCfs.decorateKey(key);
+            this.dk = baseCfs.partitioner.decorateKey(key);
             this.clusteringToRow = new HashMap<>();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 31fda34..2cb7f61 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -164,7 +164,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
         // if user specified tokens, use those
         if (initialTokens.size() > 0)
             return getSpecifiedTokens(metadata, initialTokens);
-
+        
         int numTokens = DatabaseDescriptor.getNumTokens();
         if (numTokens < 1)
             throw new ConfigurationException("num_tokens must be >= 1");
@@ -179,13 +179,13 @@ public class BootStrapper extends ProgressEventNotifierSupport
     }
 
     private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata,
-                                                        Collection<String> initialTokens)
+                                                                Collection<String> initialTokens)
     {
         logger.debug("tokens manually specified as {}",  initialTokens);
         List<Token> tokens = new ArrayList<>(initialTokens.size());
         for (String tokenString : initialTokens)
         {
-            Token token = metadata.partitioner.getTokenFactory().fromString(tokenString);
+            Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
             if (metadata.getEndpoint(token) != null)
                 throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first).");
             tokens.add(token);
@@ -202,8 +202,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
         if (ks == null)
             throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace);
         AbstractReplicationStrategy rs = ks.getReplicationStrategy();
-
-        return TokenAllocation.allocateTokens(metadata, rs, address, numTokens);
+        
+        return TokenAllocation.allocateTokens(metadata, rs, StorageService.getPartitioner(), address, numTokens);
     }
 
     public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
@@ -211,7 +211,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
         Set<Token> tokens = new HashSet<>(numTokens);
         while (tokens.size() < numTokens)
         {
-            Token token = metadata.partitioner.getRandomToken();
+            Token token = StorageService.getPartitioner().getRandomToken();
             if (metadata.getEndpoint(token) == null)
                 tokens.add(token);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index 46872c1..d7139d0 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -300,9 +300,4 @@ public class ByteOrderedPartitioner implements IPartitioner
     {
         return BytesType.instance;
     }
-
-    public AbstractType<?> partitionOrdering()
-    {
-        return BytesType.instance;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index e0a08dc..b22da66 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -78,10 +78,4 @@ public interface IPartitioner
     public Map<Token, Float> describeOwnership(List<Token> sortedTokens);
 
     public AbstractType<?> getTokenValidator();
-
-    /**
-     * Abstract type that orders the same way as DecoratedKeys provided by this partitioner.
-     * Used by secondary indices.
-     */
-    public AbstractType<?> partitionOrdering();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 2a5a16e..01dc75e 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -84,11 +84,6 @@ public class LocalPartitioner implements IPartitioner
         return comparator;
     }
 
-    public AbstractType<?> partitionOrdering()
-    {
-        return comparator;
-    }
-
     public class LocalToken extends ComparableObjectToken<ByteBuffer>
     {
         static final long serialVersionUID = 8437543776403014875L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index d68be3f..003879c 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PreHashedDecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -46,7 +45,6 @@ public class Murmur3Partitioner implements IPartitioner
     private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(MINIMUM);
 
     public static final Murmur3Partitioner instance = new Murmur3Partitioner();
-    public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
     public DecoratedKey decorateKey(ByteBuffer key)
     {
@@ -290,9 +288,4 @@ public class Murmur3Partitioner implements IPartitioner
     {
         return LongType.instance;
     }
-
-    public AbstractType<?> partitionOrdering()
-    {
-        return partitionOrdering;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 45c2cfa..ae0326f 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -240,9 +240,4 @@ public class OrderPreservingPartitioner implements IPartitioner
     {
         return UTF8Type.instance;
     }
-
-    public AbstractType<?> partitionOrdering()
-    {
-        return UTF8Type.instance;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index b0dea01..71a0a99 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.GuidGenerator;
@@ -48,7 +47,6 @@ public class RandomPartitioner implements IPartitioner
     private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(FBUtilities.hashToBigInteger(ByteBuffer.allocate(1))));
 
     public static final RandomPartitioner instance = new RandomPartitioner();
-    public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
     public DecoratedKey decorateKey(ByteBuffer key)
     {
@@ -198,9 +196,4 @@ public class RandomPartitioner implements IPartitioner
     {
         return IntegerType.instance;
     }
-
-    public AbstractType<?> partitionOrdering()
-    {
-        return partitionOrdering;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index e7624c3..68c8a11 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -333,7 +333,7 @@ public class RangeStreamer
             Collection<Range<Token>> ranges = entry.getValue().getValue();
 
             // filter out already streamed ranges
-            Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
+            Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner());
             if (ranges.removeAll(availableRanges))
             {
                 logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index b4281ce..dd3a02b 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -33,6 +33,7 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -48,11 +49,12 @@ public class TokenAllocation
 
     public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
                                                    final AbstractReplicationStrategy rs,
+                                                   final IPartitioner partitioner,
                                                    final InetAddress endpoint,
                                                    int numTokens)
     {
         StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint);
-        Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
+        Collection<Token> tokens = create(tokenMetadata, strategy, partitioner).addUnit(endpoint, numTokens);
         tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens);
 
         if (logger.isWarnEnabled())
@@ -139,7 +141,7 @@ public class TokenAllocation
         return stat;
     }
 
-    static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy)
+    static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy, IPartitioner partitioner)
     {
         NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>();
         for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
@@ -147,7 +149,7 @@ public class TokenAllocation
             if (strategy.inAllocationRing(en.getValue()))
                 sortedTokens.put(en.getKey(), en.getValue());
         }
-        return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, tokenMetadata.partitioner);
+        return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner);
     }
 
     interface StrategyAdapter extends ReplicationStrategy<InetAddress>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 5fa402a..e61a35a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -584,7 +584,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 JVMStabilityInspector.inspectThrowable(th);
                 // TODO this is broken
                 logger.warn("Unable to calculate tokens for {}.  Will use a random one", address);
-                tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken());
+                tokens = Collections.singletonList(StorageService.getPartitioner().getRandomToken());
             }
             int generation = epState.getHeartBeatState().getGeneration();
             int heartbeat = epState.getHeartBeatState().getHeartBeatVersion();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f4b4da8..acc9141 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Closeable;
-import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -31,6 +30,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
@@ -46,11 +46,12 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
     protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat();
     protected static AtomicInteger generation = new AtomicInteger(0);
 
-    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
+    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
     {
         this.metadata = metadata;
         this.directory = directory;
         this.columns = columns;
+        DatabaseDescriptor.setPartitioner(partitioner);
     }
 
     protected void setSSTableFormatType(SSTableFormat.Type type)
@@ -102,11 +103,6 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
         return maxGen;
     }
 
-    PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
-    {
-        return getUpdateFor(metadata.decorateKey(key));
-    }
-
     /**
      * Returns a PartitionUpdate suitable to write on this writer for the provided key.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 7ae5651..43e214b 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -77,9 +77,6 @@ public class CQLSSTableWriter implements Closeable
     static
     {
         Config.setClientMode(true);
-        // Partitioner is not set in client mode.
-        if (DatabaseDescriptor.getPartitioner() == null)
-            DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
     }
 
     private final AbstractSSTableSimpleWriter writer;
@@ -222,7 +219,10 @@ public class CQLSSTableWriter implements Closeable
         try
         {
             for (ByteBuffer key : keys)
-                insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
+            {
+                DecoratedKey dk = DatabaseDescriptor.getPartitioner().decorateKey(key);
+                insert.addUpdateForKey(writer.getUpdateFor(dk), clustering, params);
+            }
             return this;
         }
         catch (SSTableSimpleUnsortedWriter.SyncException e)
@@ -277,6 +277,7 @@ public class CQLSSTableWriter implements Closeable
     public static class Builder
     {
         private File directory;
+        private IPartitioner partitioner = Murmur3Partitioner.instance;
 
         protected SSTableFormat.Type formatType = null;
 
@@ -401,7 +402,7 @@ public class CQLSSTableWriter implements Closeable
          */
         public Builder withPartitioner(IPartitioner partitioner)
         {
-            this.schema = schema.copy(partitioner);
+            this.partitioner = partitioner;
             return this;
         }
 
@@ -510,8 +511,8 @@ public class CQLSSTableWriter implements Closeable
                 throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
 
             AbstractSSTableSimpleWriter writer = sorted
-                                               ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns())
-                                               : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB);
+                                               ? new SSTableSimpleWriter(directory, schema, partitioner, insert.updatedColumns())
+                                               : new SSTableSimpleUnsortedWriter(directory, schema, partitioner, insert.updatedColumns(), bufferSizeInMB);
 
             if (formatType != null)
                 writer.setSSTableFormatType(formatType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 8fb300b..5de2452 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -23,10 +23,8 @@ import java.io.IOException;
 
 import com.google.common.collect.AbstractIterator;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -82,13 +80,11 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
     }
 
     private final In in;
-    private final IPartitioner partitioner;
 
 
-    public KeyIterator(Descriptor desc, CFMetaData metadata)
+    public KeyIterator(Descriptor desc)
     {
         in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
-        partitioner = metadata.partitioner;
     }
 
     protected DecoratedKey computeNext()
@@ -98,7 +94,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
             if (in.isEOF())
                 return endOfData();
 
-            DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
+            DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
             RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the entry
             return key;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index bbc56cc..f1e01f2 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -40,7 +40,7 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
     {
         iters = new ArrayList<>(sstables.size());
         for (SSTableReader sstable : sstables)
-            iters.add(new KeyIterator(sstable.descriptor, sstable.metadata));
+            iters.add(new KeyIterator(sstable.descriptor));
     }
 
     private void maybeInit()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index b86d9b4..516534d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -64,29 +63,31 @@ public abstract class SSTable
     public final Descriptor descriptor;
     protected final Set<Component> components;
     public final CFMetaData metadata;
+    public final IPartitioner partitioner;
     public final boolean compression;
 
     public DecoratedKey first;
     public DecoratedKey last;
 
-    protected SSTable(Descriptor descriptor, CFMetaData metadata)
+    protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
     {
-        this(descriptor, new HashSet<>(), metadata);
+        this(descriptor, new HashSet<>(), metadata, partitioner);
     }
 
-    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata)
+    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
     {
         // In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
         // full schema definition. SSTableLoader use that ability
         assert descriptor != null;
         assert components != null;
-        assert metadata != null;
+        assert partitioner != null;
 
         this.descriptor = descriptor;
         Set<Component> dataComponents = new HashSet<>(components);
         this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
         this.components = new CopyOnWriteArraySet<>(dataComponents);
         this.metadata = metadata;
+        this.partitioner = partitioner;
     }
 
     /**
@@ -120,16 +121,6 @@ public abstract class SSTable
         return true;
     }
 
-    public IPartitioner getPartitioner()
-    {
-        return metadata.partitioner;
-    }
-
-    public DecoratedKey decorateKey(ByteBuffer key)
-    {
-        return getPartitioner().decorateKey(key);
-    }
-
     /**
      * If the given @param key occupies only part of a larger buffer, allocate a new buffer that is only
      * as large as necessary.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 20c3962..f25d3ff 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -133,7 +133,7 @@ public class SSTableLoader implements StreamEventHandler
                     // To conserve memory, open SSTableReaders without bloom filters and discard
                     // the index summary after calculating the file sections to stream and the estimated
                     // number of keys for each endpoint. See CASSANDRA-5555 for details.
-                    SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata);
+                    SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata, client.getPartitioner());
                     sstables.add(sstable);
 
                     // calculate the sstable sections to stream as well as the estimated number of
@@ -252,6 +252,7 @@ public class SSTableLoader implements StreamEventHandler
     public static abstract class Client
     {
         private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
+        private IPartitioner partitioner;
 
         /**
          * Initialize the client.
@@ -298,6 +299,23 @@ public class SSTableLoader implements StreamEventHandler
             return endpointToRanges;
         }
 
+        protected void setPartitioner(String partclass) throws ConfigurationException
+        {
+            setPartitioner(FBUtilities.newPartitioner(partclass));
+        }
+
+        protected void setPartitioner(IPartitioner partitioner)
+        {
+            this.partitioner = partitioner;
+            // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner
+            DatabaseDescriptor.setPartitioner(partitioner);
+        }
+
+        public IPartitioner getPartitioner()
+        {
+            return partitioner;
+        }
+
         protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint)
         {
             Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index f4b9adf..a70b92f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -59,9 +60,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
-    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
+    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
     {
-        super(directory, metadata, columns);
+        super(directory, metadata, partitioner, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
         this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
         diskWriter.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 45722cd..b22a048 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -44,9 +44,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
+    protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
     {
-        super(directory, metadata, columns);
+        super(directory, metadata, partitioner, columns);
     }
 
     private SSTableTxnWriter getOrCreateWriter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 5ceced5..6d39d2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -27,11 +27,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
@@ -43,23 +42,27 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.lifecycle.TransactionLogs;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
 import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 
@@ -348,18 +351,21 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
     {
-        return open(desc, componentsFor(desc), metadata);
+        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
+                ? new LocalPartitioner(metadata.getKeyValidator())
+                : StorageService.getPartitioner();
+        return open(desc, componentsFor(desc), metadata, p);
     }
 
-    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
+    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
-        return open(descriptor, components, metadata, true, true);
+        return open(descriptor, components, metadata, partitioner, true, true);
     }
 
     // use only for offline or "Standalone" operations
     public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs) throws IOException
     {
-        return open(descriptor, components, cfs.metadata, false, false); // do not track hotness
+        return open(descriptor, components, cfs.metadata, cfs.partitioner, false, false); // do not track hotness
     }
 
     /**
@@ -368,10 +374,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * @param descriptor
      * @param components
      * @param metadata
+     * @param partitioner
      * @return opened SSTableReader
      * @throws IOException
      */
-    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
+    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
         // Minimum components without which we can't do anything
         assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
@@ -387,7 +394,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable or no stats available.
         // In that case, we skip the check.
-        String partitionerName = metadata.partitioner.getClass().getCanonicalName();
+        String partitionerName = partitioner.getClass().getCanonicalName();
         if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
         {
             logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -399,6 +406,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         SSTableReader sstable = internalOpen(descriptor,
                                              components,
                                              metadata,
+                                             partitioner,
                                              System.currentTimeMillis(),
                                              statsMetadata,
                                              OpenReason.NORMAL,
@@ -423,6 +431,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public static SSTableReader open(Descriptor descriptor,
                                       Set<Component> components,
                                       CFMetaData metadata,
+                                      IPartitioner partitioner,
                                       boolean validate,
                                       boolean trackHotness) throws IOException
     {
@@ -443,7 +452,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         // Check if sstable is created using same partitioner.
         // Partitioner can be null, which indicates older version of sstable or no stats available.
         // In that case, we skip the check.
-        String partitionerName = metadata.partitioner.getClass().getCanonicalName();
+        String partitionerName = partitioner.getClass().getCanonicalName();
         if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
         {
             logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -455,6 +464,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         SSTableReader sstable = internalOpen(descriptor,
                                              components,
                                              metadata,
+                                             partitioner,
                                              System.currentTimeMillis(),
                                              statsMetadata,
                                              OpenReason.NORMAL,
@@ -492,7 +502,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     }
 
     public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
-                                                    final CFMetaData metadata)
+                                                    final CFMetaData metadata,
+                                                    final IPartitioner partitioner)
     {
         final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 
@@ -506,7 +517,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     SSTableReader sstable;
                     try
                     {
-                        sstable = open(entry.getKey(), entry.getValue(), metadata);
+                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
                     }
                     catch (CorruptSSTableException ex)
                     {
@@ -551,6 +562,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public static SSTableReader internalOpen(Descriptor desc,
                                       Set<Component> components,
                                       CFMetaData metadata,
+                                      IPartitioner partitioner,
                                       SegmentedFile ifile,
                                       SegmentedFile dfile,
                                       IndexSummary isummary,
@@ -560,9 +572,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                       OpenReason openReason,
                                       SerializationHeader header)
     {
-        assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 
-        SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
 
         reader.bf = bf;
         reader.ifile = ifile;
@@ -577,6 +589,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     private static SSTableReader internalOpen(final Descriptor descriptor,
                                             Set<Component> components,
                                             CFMetaData metadata,
+                                            IPartitioner partitioner,
                                             Long maxDataAge,
                                             StatsMetadata sstableMetadata,
                                             OpenReason openReason,
@@ -584,18 +597,19 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     {
         Factory readerFactory = descriptor.getFormat().getReaderFactory();
 
-        return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
     }
 
     protected SSTableReader(final Descriptor desc,
                             Set<Component> components,
                             CFMetaData metadata,
+                            IPartitioner partitioner,
                             long maxDataAge,
                             StatsMetadata sstableMetadata,
                             OpenReason openReason,
                             SerializationHeader header)
     {
-        super(desc, components, metadata);
+        super(desc, components, metadata, partitioner);
         this.sstableMetadata = sstableMetadata;
         this.header = header;
         this.maxDataAge = maxDataAge;
@@ -800,7 +814,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 {
                     ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
                     RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex);
-                    DecoratedKey decoratedKey = decorateKey(key);
+                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
                     if (first == null)
                         first = decoratedKey;
                     last = decoratedKey;
@@ -818,7 +832,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 }
 
                 if (!summaryLoaded)
-                    indexSummary = summaryBuilder.build(getPartitioner());
+                    indexSummary = summaryBuilder.build(partitioner);
             }
         }
 
@@ -848,10 +862,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
             indexSummary = IndexSummary.serializer.deserialize(
-                    iStream, getPartitioner(), descriptor.version.hasSamplingLevel(),
+                    iStream, partitioner, descriptor.version.hasSamplingLevel(),
                     metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
-            first = decorateKey(ByteBufferUtil.readWithLength(iStream));
-            last = decorateKey(ByteBufferUtil.readWithLength(iStream));
+            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
             dbuilder.deserializeBounds(iStream);
         }
@@ -1050,6 +1064,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         SSTableReader replacement = internalOpen(descriptor,
                                                  components,
                                                  metadata,
+                                                 partitioner,
                                                  ifile != null ? ifile.sharedCopy() : null,
                                                  dfile.sharedCopy(),
                                                  newSummary,
@@ -1153,7 +1168,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             else if (samplingLevel < indexSummary.getSamplingLevel())
             {
                 // we can use the existing index summary to make a smaller one
-                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
+                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 
                 try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
                     SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
@@ -1188,11 +1203,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 long indexPosition;
                 while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
                 {
-                    summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+                    summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
                     RowIndexEntry.Serializer.skip(primaryIndex);
                 }
 
-                return summaryBuilder.build(getPartitioner());
+                return summaryBuilder.build(partitioner);
             }
         }
         finally
@@ -1289,8 +1304,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
         CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 
-        // We need the parent cf metadata
-        String cfName = metadata.isIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
+        //We need the parent cf metadata
+        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
         cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 
         return cmd;
@@ -1462,7 +1477,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     public DecoratedKey next()
                     {
                         byte[] bytes = indexSummary.getKey(idx++);
-                        return decorateKey(ByteBuffer.wrap(bytes));
+                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
                     }
 
                     public void remove()
@@ -1604,7 +1619,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 while (!in.isEOF())
                 {
                     ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
-                    DecoratedKey indexDecoratedKey = decorateKey(indexKey);
+                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
                     if (indexDecoratedKey.compareTo(token) > 0)
                         return indexDecoratedKey;
 
@@ -2285,6 +2300,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         public abstract SSTableReader open(final Descriptor descriptor,
                                            Set<Component> components,
                                            CFMetaData metadata,
+                                           IPartitioner partitioner,
                                            Long maxDataAge,
                                            StatsMetadata sstableMetadata,
                                            OpenReason openReason,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index fa691b8..08a9dcc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -27,11 +27,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -72,10 +74,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                             long keyCount, 
                             long repairedAt, 
                             CFMetaData metadata, 
+                            IPartitioner partitioner, 
                             MetadataCollector metadataCollector, 
                             SerializationHeader header)
     {
-        super(descriptor, components(metadata), metadata);
+        super(descriptor, components(metadata), metadata, partitioner);
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
         this.metadataCollector = metadataCollector;
@@ -87,18 +90,19 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        Long keyCount,
                                        Long repairedAt,
                                        CFMetaData metadata,
+                                       IPartitioner partitioner,
                                        MetadataCollector metadataCollector,
                                        SerializationHeader header,
                                        LifecycleTransaction txn)
     {
         Factory writerFactory = descriptor.getFormat().getWriterFactory();
-        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
     }
 
     public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
     {
         CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
-        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header, txn);
     }
 
     public static SSTableWriter create(CFMetaData metadata,
@@ -106,11 +110,12 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        long keyCount,
                                        long repairedAt,
                                        int sstableLevel,
+                                       IPartitioner partitioner,
                                        SerializationHeader header,
                                        LifecycleTransaction txn)
     {
         MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-        return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
+        return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header, txn);
     }
 
     public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn)
@@ -250,7 +255,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
 
     protected Map<MetadataType, MetadataComponent> finalizeMetadata()
     {
-        return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
+        return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
                                                   metadata.getBloomFilterFpChance(),
                                                   repairedAt,
                                                   header);
@@ -282,6 +287,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                            long keyCount,
                                            long repairedAt,
                                            CFMetaData metadata,
+                                           IPartitioner partitioner,
                                            MetadataCollector metadataCollector,
                                            SerializationHeader header,
                                            LifecycleTransaction txn);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 860cd9f..a072d4d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -85,20 +86,21 @@ public class BigFormat implements SSTableFormat
                                   long keyCount, 
                                   long repairedAt, 
                                   CFMetaData metadata, 
+                                  IPartitioner partitioner, 
                                   MetadataCollector metadataCollector, 
                                   SerializationHeader header,
                                   LifecycleTransaction txn)
         {
-            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
         }
     }
 
     static class ReaderFactory extends SSTableReader.Factory
     {
         @Override
-        public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
+        public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
         {
-            return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+            return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 87608fd..b539c79 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -52,9 +52,9 @@ public class BigTableReader extends SSTableReader
 {
     private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 
-    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
+    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
     {
-        super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+        super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
     }
 
     public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
@@ -201,7 +201,7 @@ public class BigTableReader extends SSTableReader
                     }
                     else
                     {
-                        DecoratedKey indexDecoratedKey = decorateKey(indexKey);
+                        DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
                         int comparison = indexDecoratedKey.compareTo(key);
                         int v = op.apply(comparison);
                         opSatisfied = (v == 0);
@@ -227,7 +227,7 @@ public class BigTableReader extends SSTableReader
                                 // expensive sanity check!  see CASSANDRA-4687
                                 try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
                                 {
-                                    DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+                                    DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
                                     if (!keyInDisk.equals(key))
                                         throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
                                 }