You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/01 00:35:38 UTC
[3/4] cassandra git commit: Revert "Revert "Stop accessing the
partitioner directly via StorageService""
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 94031ab..9221090 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -42,15 +42,12 @@ import org.apache.cassandra.db.index.keys.KeysIndex;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.LocalByPartionerType;
+import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@ -79,10 +76,6 @@ public abstract class SecondaryIndex
*/
public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
- public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder()
- ? BytesType.instance
- : new LocalByPartionerType(StorageService.getPartitioner());
-
/**
* Base CF that has many indexes
*/
@@ -303,7 +296,7 @@ public abstract class SecondaryIndex
*/
public DecoratedKey getIndexKeyFor(ByteBuffer value)
{
- return getIndexCfs().partitioner.decorateKey(value);
+ return getIndexCfs().decorateKey(value);
}
/**
@@ -381,11 +374,20 @@ public abstract class SecondaryIndex
*/
public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def)
{
+ return newIndexMetadata(baseMetadata, def, def.type);
+ }
+
+ /**
+ * Create the index metadata for the index on a given column of a given table.
+ */
+ static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def, AbstractType<?> comparator)
+ {
if (def.getIndexType() == IndexType.CUSTOM)
return null;
CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
.withId(baseMetadata.cfId)
+ .withPartitioner(new LocalPartitioner(comparator))
.addPartitionKey(def.name, def.type);
if (def.getIndexType() == IndexType.COMPOSITES)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 42861c5..29f235c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -101,7 +101,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
{
- indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+ indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
for (ColumnDefinition def : baseMetadata.clusteringColumns())
indexMetadata.addClusteringColumn(def.name, def.type);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 6529ad9..cd4aff9 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.utils.concurrent.OpOrder;
/**
@@ -48,7 +47,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
{
public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
{
- indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+ indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
for (int i = 0; i < columnDef.position(); i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index d322faf..b76bf7e 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -111,7 +111,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
// *data* for a given partition.
BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator());
List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
- DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey);
+ DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey);
while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 7930bd6..478559a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -23,7 +23,6 @@ import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -42,7 +41,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
{
public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
{
- indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+ indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index bcaf70b..53a9b4a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -84,7 +84,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
while (next == null && indexHits.hasNext())
{
Row hit = indexHits.next();
- DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0));
+ DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0));
SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
baseCfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
deleted file mode 100644
index e02ba3c..0000000
--- a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.marshal;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.serializers.MarshalException;
-
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
- * Not intended for user-defined CFs, and will in fact error out if used with such. */
-public class LocalByPartionerType extends AbstractType<ByteBuffer>
-{
- private final IPartitioner partitioner;
-
- public LocalByPartionerType(IPartitioner partitioner)
- {
- this.partitioner = partitioner;
- }
-
- public static LocalByPartionerType getInstance(TypeParser parser)
- {
- return new LocalByPartionerType(StorageService.getPartitioner());
- }
-
- @Override
- public ByteBuffer compose(ByteBuffer bytes)
- {
- throw new UnsupportedOperationException("You can't do this with a local partitioner.");
- }
-
- @Override
- public ByteBuffer decompose(ByteBuffer bytes)
- {
- throw new UnsupportedOperationException("You can't do this with a local partitioner.");
- }
-
- public String getString(ByteBuffer bytes)
- {
- return ByteBufferUtil.bytesToHex(bytes);
- }
-
- public ByteBuffer fromString(String source)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Term fromJSONObject(Object parsed)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String toJSONString(ByteBuffer buffer, int protocolVersion)
- {
- throw new UnsupportedOperationException();
- }
-
- public int compare(ByteBuffer o1, ByteBuffer o2)
- {
- // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
- return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
- }
-
- @Override
- public void validate(ByteBuffer bytes) throws MarshalException
- {
- throw new IllegalStateException("You shouldn't be validating this.");
- }
-
- public TypeSerializer<ByteBuffer> getSerializer()
- {
- throw new UnsupportedOperationException("You can't do this with a local partitioner.");
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
new file mode 100644
index 0000000..efaea53
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.serializers.MarshalException;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
+ * Not intended for user-defined CFs, and will in fact error out if used with such. */
+public class PartitionerDefinedOrder extends AbstractType<ByteBuffer>
+{
+ private final IPartitioner partitioner;
+
+ public PartitionerDefinedOrder(IPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ }
+
+ @Override
+ public ByteBuffer compose(ByteBuffer bytes)
+ {
+ throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+ }
+
+ @Override
+ public ByteBuffer decompose(ByteBuffer bytes)
+ {
+ throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+ }
+
+ public String getString(ByteBuffer bytes)
+ {
+ return ByteBufferUtil.bytesToHex(bytes);
+ }
+
+ public ByteBuffer fromString(String source)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Term fromJSONObject(Object parsed)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toJSONString(ByteBuffer buffer, int protocolVersion)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int compare(ByteBuffer o1, ByteBuffer o2)
+ {
+ // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
+ return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
+ }
+
+ @Override
+ public void validate(ByteBuffer bytes) throws MarshalException
+ {
+ throw new IllegalStateException("You shouldn't be validating this.");
+ }
+
+ public TypeSerializer<ByteBuffer> getSerializer()
+ {
+ throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index e8ec4c0..25d1887 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.ColumnFilter;
@@ -42,7 +43,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Locks;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.HeapAllocator;
-import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
@@ -59,7 +59,7 @@ public class AtomicBTreePartition implements Partition
private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class);
public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
- StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
+ DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
null));
// Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 102008f..f2e0617 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -102,6 +102,17 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true);
}
+ public PartitionUpdate(CFMetaData metadata,
+ ByteBuffer key,
+ PartitionColumns columns,
+ int initialRowCapacity)
+ {
+ this(metadata,
+ metadata.decorateKey(key),
+ columns,
+ initialRowCapacity);
+ }
+
/**
* Creates a empty immutable partition update.
*
@@ -134,7 +145,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
* Creates an immutable partition update that contains a single row update.
*
* @param metadata the metadata for the created update.
- * @param key the partition key for the partition that the created update should delete.
+ * @param key the partition key for the partition to update.
* @param row the row for the update.
*
* @return the newly created partition update containing only {@code row}.
@@ -147,6 +158,20 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
}
/**
+ * Creates an immutable partition update that contains a single row update.
+ *
+ * @param metadata the metadata for the created update.
+ * @param key the partition key for the partition to update.
+ * @param row the row for the update.
+ *
+ * @return the newly created partition update containing only {@code row}.
+ */
+ public static PartitionUpdate singleRowUpdate(CFMetaData metadata, ByteBuffer key, Row row)
+ {
+ return singleRowUpdate(metadata, metadata.decorateKey(key), row);
+ }
+
+ /**
* Turns the given iterator into an update.
*
* Warning: this method does not close the provided iterator, it is up to
@@ -262,6 +287,21 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
}
/**
+ * Creates a partition update that entirely deletes a given partition.
+ *
+ * @param metadata the metadata for the created update.
+ * @param key the partition key for the partition that the created update should delete.
+ * @param timestamp the timestamp for the deletion.
+ * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion.
+ *
+ * @return the newly created partition deletion update.
+ */
+ public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, ByteBuffer key, long timestamp, int nowInSec)
+ {
+ return fullPartitionDelete(metadata, metadata.decorateKey(key), timestamp, nowInSec);
+ }
+
+ /**
* Merges the provided updates, yielding a new update that incorporates all those updates.
*
* @param updates the collection of updates to merge. This shouldn't be empty.
@@ -695,29 +735,39 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
}
}
- public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+ public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
{
- if (version < MessagingService.VERSION_30)
+ if (version >= MessagingService.VERSION_30)
+ {
+ assert key == null; // key is only there for the old format
+ return deserialize30(in, version, flag);
+ }
+ else
{
assert key != null;
-
- // This is only used in mutation, and mutation have never allowed "null" column families
- boolean present = in.readBoolean();
- assert present;
-
- CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
- LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
- int size = in.readInt();
- Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
- SerializationHelper helper = new SerializationHelper(metadata, version, flag);
- try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper))
- {
- return PartitionUpdate.fromIterator(iterator);
- }
+ CFMetaData metadata = deserializeMetadata(in, version);
+ DecoratedKey dk = metadata.decorateKey(key);
+ return deserializePre30(in, version, flag, metadata, dk);
}
+ }
- assert key == null; // key is only there for the old format
+ // Used to share same decorated key between updates.
+ public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+ {
+ if (version >= MessagingService.VERSION_30)
+ {
+ return deserialize30(in, version, flag);
+ }
+ else
+ {
+ assert key != null;
+ CFMetaData metadata = deserializeMetadata(in, version);
+ return deserializePre30(in, version, flag, metadata, key);
+ }
+ }
+ private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+ {
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
if (header.isEmpty)
@@ -752,6 +802,28 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
false);
}
+ private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException
+ {
+ // This is only used in mutation, and mutation have never allowed "null" column families
+ boolean present = in.readBoolean();
+ assert present;
+
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ return metadata;
+ }
+
+ private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException
+ {
+ LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
+ int size = in.readInt();
+ Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
+ SerializationHelper helper = new SerializationHelper(metadata, version, flag);
+ try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper))
+ {
+ return PartitionUpdate.fromIterator(iterator);
+ }
+ }
+
public long serializedSize(PartitionUpdate update, int version)
{
if (version < MessagingService.VERSION_30)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index b96e0b1..531bd26 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -170,7 +169,7 @@ public class UnfilteredRowIteratorSerializer
public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
{
- DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithVIntLength(in));
+ DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
int flags = in.readUnsignedByte();
boolean isReversed = (flags & IS_REVERSED) != 0;
if ((flags & IS_EMPTY) != 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 082c71d..f36abae 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -303,9 +303,10 @@ public class MaterializedView
partitionKey[i] = value;
}
- return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
- .getKeyValidatorAsClusteringComparator()
- .make(partitionKey)));
+ CFMetaData metadata = getViewCfs().metadata;
+ return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
+ .getKeyValidatorAsClusteringComparator()
+ .make(partitionKey)));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 53e4e91..d0ba5ea 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -377,7 +377,7 @@ public class TemporalRow
this.baseCfs = baseCfs;
this.viewPrimaryKey = viewPrimaryKey;
this.key = key;
- this.dk = baseCfs.partitioner.decorateKey(key);
+ this.dk = baseCfs.decorateKey(key);
this.clusteringToRow = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 2cb7f61..31fda34 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -164,7 +164,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
// if user specified tokens, use those
if (initialTokens.size() > 0)
return getSpecifiedTokens(metadata, initialTokens);
-
+
int numTokens = DatabaseDescriptor.getNumTokens();
if (numTokens < 1)
throw new ConfigurationException("num_tokens must be >= 1");
@@ -179,13 +179,13 @@ public class BootStrapper extends ProgressEventNotifierSupport
}
private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata,
- Collection<String> initialTokens)
+ Collection<String> initialTokens)
{
logger.debug("tokens manually specified as {}", initialTokens);
List<Token> tokens = new ArrayList<>(initialTokens.size());
for (String tokenString : initialTokens)
{
- Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
+ Token token = metadata.partitioner.getTokenFactory().fromString(tokenString);
if (metadata.getEndpoint(token) != null)
throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first).");
tokens.add(token);
@@ -202,8 +202,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
if (ks == null)
throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace);
AbstractReplicationStrategy rs = ks.getReplicationStrategy();
-
- return TokenAllocation.allocateTokens(metadata, rs, StorageService.getPartitioner(), address, numTokens);
+
+ return TokenAllocation.allocateTokens(metadata, rs, address, numTokens);
}
public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
@@ -211,7 +211,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
Set<Token> tokens = new HashSet<>(numTokens);
while (tokens.size() < numTokens)
{
- Token token = StorageService.getPartitioner().getRandomToken();
+ Token token = metadata.partitioner.getRandomToken();
if (metadata.getEndpoint(token) == null)
tokens.add(token);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index d7139d0..46872c1 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -300,4 +300,9 @@ public class ByteOrderedPartitioner implements IPartitioner
{
return BytesType.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return BytesType.instance;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index b22da66..e0a08dc 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -78,4 +78,10 @@ public interface IPartitioner
public Map<Token, Float> describeOwnership(List<Token> sortedTokens);
public AbstractType<?> getTokenValidator();
+
+ /**
+ * Abstract type that orders the same way as DecoratedKeys provided by this partitioner.
+ * Used by secondary indices.
+ */
+ public AbstractType<?> partitionOrdering();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 01dc75e..2a5a16e 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -84,6 +84,11 @@ public class LocalPartitioner implements IPartitioner
return comparator;
}
+ public AbstractType<?> partitionOrdering()
+ {
+ return comparator;
+ }
+
public class LocalToken extends ComparableObjectToken<ByteBuffer>
{
static final long serialVersionUID = 8437543776403014875L;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 003879c..d68be3f 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PreHashedDecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,6 +46,7 @@ public class Murmur3Partitioner implements IPartitioner
private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(MINIMUM);
public static final Murmur3Partitioner instance = new Murmur3Partitioner();
+ public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
public DecoratedKey decorateKey(ByteBuffer key)
{
@@ -288,4 +290,9 @@ public class Murmur3Partitioner implements IPartitioner
{
return LongType.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return partitionOrdering;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index ae0326f..45c2cfa 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -240,4 +240,9 @@ public class OrderPreservingPartitioner implements IPartitioner
{
return UTF8Type.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return UTF8Type.instance;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 71a0a99..b0dea01 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.GuidGenerator;
@@ -47,6 +48,7 @@ public class RandomPartitioner implements IPartitioner
private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(FBUtilities.hashToBigInteger(ByteBuffer.allocate(1))));
public static final RandomPartitioner instance = new RandomPartitioner();
+ public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
public DecoratedKey decorateKey(ByteBuffer key)
{
@@ -196,4 +198,9 @@ public class RandomPartitioner implements IPartitioner
{
return IntegerType.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return partitionOrdering;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 68c8a11..e7624c3 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -333,7 +333,7 @@ public class RangeStreamer
Collection<Range<Token>> ranges = entry.getValue().getValue();
// filter out already streamed ranges
- Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner());
+ Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
if (ranges.removeAll(availableRanges))
{
logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index dd3a02b..b4281ce 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -33,7 +33,6 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -49,12 +48,11 @@ public class TokenAllocation
public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
final AbstractReplicationStrategy rs,
- final IPartitioner partitioner,
final InetAddress endpoint,
int numTokens)
{
StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint);
- Collection<Token> tokens = create(tokenMetadata, strategy, partitioner).addUnit(endpoint, numTokens);
+ Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens);
if (logger.isWarnEnabled())
@@ -141,7 +139,7 @@ public class TokenAllocation
return stat;
}
- static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy, IPartitioner partitioner)
+ static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy)
{
NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>();
for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
@@ -149,7 +147,7 @@ public class TokenAllocation
if (strategy.inAllocationRing(en.getValue()))
sortedTokens.put(en.getKey(), en.getValue());
}
- return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner);
+ return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, tokenMetadata.partitioner);
}
interface StrategyAdapter extends ReplicationStrategy<InetAddress>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e61a35a..5fa402a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -584,7 +584,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
JVMStabilityInspector.inspectThrowable(th);
// TODO this is broken
logger.warn("Unable to calculate tokens for {}. Will use a random one", address);
- tokens = Collections.singletonList(StorageService.getPartitioner().getRandomToken());
+ tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken());
}
int generation = epState.getHeartBeatState().getGeneration();
int heartbeat = epState.getHeartBeatState().getHeartBeatVersion();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index acc9141..f4b4da8 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.Closeable;
+import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,7 +31,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.Pair;
@@ -46,12 +46,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat();
protected static AtomicInteger generation = new AtomicInteger(0);
- protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
+ protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
{
this.metadata = metadata;
this.directory = directory;
this.columns = columns;
- DatabaseDescriptor.setPartitioner(partitioner);
}
protected void setSSTableFormatType(SSTableFormat.Type type)
@@ -103,6 +102,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
return maxGen;
}
+ PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
+ {
+ return getUpdateFor(metadata.decorateKey(key));
+ }
+
/**
* Returns a PartitionUpdate suitable to write on this writer for the provided key.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 43e214b..7ae5651 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -77,6 +77,9 @@ public class CQLSSTableWriter implements Closeable
static
{
Config.setClientMode(true);
+ // Partitioner is not set in client mode.
+ if (DatabaseDescriptor.getPartitioner() == null)
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
}
private final AbstractSSTableSimpleWriter writer;
@@ -219,10 +222,7 @@ public class CQLSSTableWriter implements Closeable
try
{
for (ByteBuffer key : keys)
- {
- DecoratedKey dk = DatabaseDescriptor.getPartitioner().decorateKey(key);
- insert.addUpdateForKey(writer.getUpdateFor(dk), clustering, params);
- }
+ insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
return this;
}
catch (SSTableSimpleUnsortedWriter.SyncException e)
@@ -277,7 +277,6 @@ public class CQLSSTableWriter implements Closeable
public static class Builder
{
private File directory;
- private IPartitioner partitioner = Murmur3Partitioner.instance;
protected SSTableFormat.Type formatType = null;
@@ -402,7 +401,7 @@ public class CQLSSTableWriter implements Closeable
*/
public Builder withPartitioner(IPartitioner partitioner)
{
- this.partitioner = partitioner;
+ this.schema = schema.copy(partitioner);
return this;
}
@@ -511,8 +510,8 @@ public class CQLSSTableWriter implements Closeable
throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
AbstractSSTableSimpleWriter writer = sorted
- ? new SSTableSimpleWriter(directory, schema, partitioner, insert.updatedColumns())
- : new SSTableSimpleUnsortedWriter(directory, schema, partitioner, insert.updatedColumns(), bufferSizeInMB);
+ ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns())
+ : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB);
if (formatType != null)
writer.setSSTableFormatType(formatType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 5de2452..8fb300b 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -80,11 +82,13 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
}
private final In in;
+ private final IPartitioner partitioner;
- public KeyIterator(Descriptor desc)
+ public KeyIterator(Descriptor desc, CFMetaData metadata)
{
in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
+ partitioner = metadata.partitioner;
}
protected DecoratedKey computeNext()
@@ -94,7 +98,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
if (in.isEOF())
return endOfData();
- DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
+ DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the entry
return key;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index f1e01f2..bbc56cc 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -40,7 +40,7 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
{
iters = new ArrayList<>(sstables.size());
for (SSTableReader sstable : sstables)
- iters.add(new KeyIterator(sstable.descriptor));
+ iters.add(new KeyIterator(sstable.descriptor, sstable.metadata));
}
private void maybeInit()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 516534d..b86d9b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
+import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -63,31 +64,29 @@ public abstract class SSTable
public final Descriptor descriptor;
protected final Set<Component> components;
public final CFMetaData metadata;
- public final IPartitioner partitioner;
public final boolean compression;
public DecoratedKey first;
public DecoratedKey last;
- protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
+ protected SSTable(Descriptor descriptor, CFMetaData metadata)
{
- this(descriptor, new HashSet<>(), metadata, partitioner);
+ this(descriptor, new HashSet<>(), metadata);
}
- protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
+ protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata)
{
// In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
// full schema definition. SSTableLoader use that ability
assert descriptor != null;
assert components != null;
- assert partitioner != null;
+ assert metadata != null;
this.descriptor = descriptor;
Set<Component> dataComponents = new HashSet<>(components);
this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
this.components = new CopyOnWriteArraySet<>(dataComponents);
this.metadata = metadata;
- this.partitioner = partitioner;
}
/**
@@ -121,6 +120,16 @@ public abstract class SSTable
return true;
}
+ public IPartitioner getPartitioner()
+ {
+ return metadata.partitioner;
+ }
+
+ public DecoratedKey decorateKey(ByteBuffer key)
+ {
+ return getPartitioner().decorateKey(key);
+ }
+
/**
* If the given @param key occupies only part of a larger buffer, allocate a new buffer that is only
* as large as necessary.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f25d3ff..20c3962 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -133,7 +133,7 @@ public class SSTableLoader implements StreamEventHandler
// To conserve memory, open SSTableReaders without bloom filters and discard
// the index summary after calculating the file sections to stream and the estimated
// number of keys for each endpoint. See CASSANDRA-5555 for details.
- SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata, client.getPartitioner());
+ SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata);
sstables.add(sstable);
// calculate the sstable sections to stream as well as the estimated number of
@@ -252,7 +252,6 @@ public class SSTableLoader implements StreamEventHandler
public static abstract class Client
{
private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
- private IPartitioner partitioner;
/**
* Initialize the client.
@@ -299,23 +298,6 @@ public class SSTableLoader implements StreamEventHandler
return endpointToRanges;
}
- protected void setPartitioner(String partclass) throws ConfigurationException
- {
- setPartitioner(FBUtilities.newPartitioner(partclass));
- }
-
- protected void setPartitioner(IPartitioner partitioner)
- {
- this.partitioner = partitioner;
- // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner
- DatabaseDescriptor.setPartitioner(partitioner);
- }
-
- public IPartitioner getPartitioner()
- {
- return partitioner;
- }
-
protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint)
{
Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index a70b92f..f4b9adf 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -60,9 +59,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
private final DiskWriter diskWriter = new DiskWriter();
- SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
+ SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
{
- super(directory, metadata, partitioner, columns);
+ super(directory, metadata, columns);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
diskWriter.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index b22a048..45722cd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -44,9 +44,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
+ protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
{
- super(directory, metadata, partitioner, columns);
+ super(directory, metadata, columns);
}
private SSTableTxnWriter getOrCreateWriter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 6d39d2d..5ceced5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -27,10 +27,11 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
@@ -42,27 +43,23 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
import org.apache.cassandra.io.sstable.metadata.*;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
@@ -351,21 +348,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
{
- IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
- ? new LocalPartitioner(metadata.getKeyValidator())
- : StorageService.getPartitioner();
- return open(desc, componentsFor(desc), metadata, p);
+ return open(desc, componentsFor(desc), metadata);
}
- public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
{
- return open(descriptor, components, metadata, partitioner, true, true);
+ return open(descriptor, components, metadata, true, true);
}
// use only for offline or "Standalone" operations
public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs) throws IOException
{
- return open(descriptor, components, cfs.metadata, cfs.partitioner, false, false); // do not track hotness
+ return open(descriptor, components, cfs.metadata, false, false); // do not track hotness
}
/**
@@ -374,11 +368,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @param descriptor
* @param components
* @param metadata
- * @param partitioner
* @return opened SSTableReader
* @throws IOException
*/
- public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
{
// Minimum components without which we can't do anything
assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
@@ -394,7 +387,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// Check if sstable is created using same partitioner.
// Partitioner can be null, which indicates older version of sstable or no stats available.
// In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
+ String partitionerName = metadata.partitioner.getClass().getCanonicalName();
if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
{
logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -406,7 +399,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader sstable = internalOpen(descriptor,
components,
metadata,
- partitioner,
System.currentTimeMillis(),
statsMetadata,
OpenReason.NORMAL,
@@ -431,7 +423,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static SSTableReader open(Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
boolean validate,
boolean trackHotness) throws IOException
{
@@ -452,7 +443,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// Check if sstable is created using same partitioner.
// Partitioner can be null, which indicates older version of sstable or no stats available.
// In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
+ String partitionerName = metadata.partitioner.getClass().getCanonicalName();
if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
{
logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -464,7 +455,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader sstable = internalOpen(descriptor,
components,
metadata,
- partitioner,
System.currentTimeMillis(),
statsMetadata,
OpenReason.NORMAL,
@@ -502,8 +492,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
- final CFMetaData metadata,
- final IPartitioner partitioner)
+ final CFMetaData metadata)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
@@ -517,7 +506,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader sstable;
try
{
- sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
+ sstable = open(entry.getKey(), entry.getValue(), metadata);
}
catch (CorruptSSTableException ex)
{
@@ -562,7 +551,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static SSTableReader internalOpen(Descriptor desc,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
SegmentedFile ifile,
SegmentedFile dfile,
IndexSummary isummary,
@@ -572,9 +560,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
OpenReason openReason,
SerializationHeader header)
{
- assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+ assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
- SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
reader.bf = bf;
reader.ifile = ifile;
@@ -589,7 +577,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
private static SSTableReader internalOpen(final Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
Long maxDataAge,
StatsMetadata sstableMetadata,
OpenReason openReason,
@@ -597,19 +584,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
Factory readerFactory = descriptor.getFormat().getReaderFactory();
- return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
protected SSTableReader(final Descriptor desc,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
long maxDataAge,
StatsMetadata sstableMetadata,
OpenReason openReason,
SerializationHeader header)
{
- super(desc, components, metadata, partitioner);
+ super(desc, components, metadata);
this.sstableMetadata = sstableMetadata;
this.header = header;
this.maxDataAge = maxDataAge;
@@ -814,7 +800,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex);
- DecoratedKey decoratedKey = partitioner.decorateKey(key);
+ DecoratedKey decoratedKey = decorateKey(key);
if (first == null)
first = decoratedKey;
last = decoratedKey;
@@ -832,7 +818,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
if (!summaryLoaded)
- indexSummary = summaryBuilder.build(partitioner);
+ indexSummary = summaryBuilder.build(getPartitioner());
}
}
@@ -862,10 +848,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
iStream = new DataInputStream(new FileInputStream(summariesFile));
indexSummary = IndexSummary.serializer.deserialize(
- iStream, partitioner, descriptor.version.hasSamplingLevel(),
+ iStream, getPartitioner(), descriptor.version.hasSamplingLevel(),
metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
- first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
- last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ first = decorateKey(ByteBufferUtil.readWithLength(iStream));
+ last = decorateKey(ByteBufferUtil.readWithLength(iStream));
ibuilder.deserializeBounds(iStream);
dbuilder.deserializeBounds(iStream);
}
@@ -1064,7 +1050,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader replacement = internalOpen(descriptor,
components,
metadata,
- partitioner,
ifile != null ? ifile.sharedCopy() : null,
dfile.sharedCopy(),
newSummary,
@@ -1168,7 +1153,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
else if (samplingLevel < indexSummary.getSamplingLevel())
{
// we can use the existing index summary to make a smaller one
- newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
@@ -1203,11 +1188,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
long indexPosition;
while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
- summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+ summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
RowIndexEntry.Serializer.skip(primaryIndex);
}
- return summaryBuilder.build(partitioner);
+ return summaryBuilder.build(getPartitioner());
}
}
finally
@@ -1304,8 +1289,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
- //We need the parent cf metadata
- String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
+ // We need the parent cf metadata
+ String cfName = metadata.isIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
return cmd;
@@ -1477,7 +1462,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public DecoratedKey next()
{
byte[] bytes = indexSummary.getKey(idx++);
- return partitioner.decorateKey(ByteBuffer.wrap(bytes));
+ return decorateKey(ByteBuffer.wrap(bytes));
}
public void remove()
@@ -1619,7 +1604,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
while (!in.isEOF())
{
ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
- DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ DecoratedKey indexDecoratedKey = decorateKey(indexKey);
if (indexDecoratedKey.compareTo(token) > 0)
return indexDecoratedKey;
@@ -2300,7 +2285,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public abstract SSTableReader open(final Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
Long maxDataAge,
StatsMetadata sstableMetadata,
OpenReason openReason,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 08a9dcc..fa691b8 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -27,13 +27,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
@@ -74,11 +72,10 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header)
{
- super(descriptor, components(metadata), metadata, partitioner);
+ super(descriptor, components(metadata), metadata);
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
@@ -90,19 +87,18 @@ public abstract class SSTableWriter extends SSTable implements Transactional
Long keyCount,
Long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
+ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
}
public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header, txn);
+ return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn);
}
public static SSTableWriter create(CFMetaData metadata,
@@ -110,12 +106,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
int sstableLevel,
- IPartitioner partitioner,
SerializationHeader header,
LifecycleTransaction txn)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header, txn);
+ return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
}
public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn)
@@ -255,7 +250,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected Map<MetadataType, MetadataComponent> finalizeMetadata()
{
- return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
metadata.getBloomFilterFpChance(),
repairedAt,
header);
@@ -287,7 +282,6 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index a072d4d..860cd9f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -23,7 +23,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -86,21 +85,20 @@ public class BigFormat implements SSTableFormat
long keyCount,
long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
+ return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
}
}
static class ReaderFactory extends SSTableReader.Factory
{
@Override
- public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
+ public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
{
- return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index b539c79..87608fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -52,9 +52,9 @@ public class BigTableReader extends SSTableReader
{
private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
- BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
+ BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
{
- super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
@@ -201,7 +201,7 @@ public class BigTableReader extends SSTableReader
}
else
{
- DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ DecoratedKey indexDecoratedKey = decorateKey(indexKey);
int comparison = indexDecoratedKey.compareTo(key);
int v = op.apply(comparison);
opSatisfied = (v == 0);
@@ -227,7 +227,7 @@ public class BigTableReader extends SSTableReader
// expensive sanity check! see CASSANDRA-4687
try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
{
- DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+ DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
if (!keyInDisk.equals(key))
throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
}