You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/01 00:35:36 UTC
[1/4] cassandra git commit: Revert "Revert "Stop accessing the
partitioner directly via StorageService""
Repository: cassandra
Updated Branches:
refs/heads/trunk 43139f866 -> 0a08525ad
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index ee51a4d..25b9cde 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -22,8 +22,6 @@ import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -44,9 +42,9 @@ import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.Scrubber;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -55,14 +53,10 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
+import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.*;
@@ -79,6 +73,8 @@ public class ScrubTest
public static final String CF_UUID = "UUIDKeys";
public static final String CF_INDEX1 = "Indexed1";
public static final String CF_INDEX2 = "Indexed2";
+ public static final String CF_INDEX1_BYTEORDERED = "Indexed1_ordered";
+ public static final String CF_INDEX2_BYTEORDERED = "Indexed2_ordered";
public static final String COL_INDEX = "birthdate";
public static final String COL_NON_INDEX = "notbirthdate";
@@ -98,7 +94,9 @@ public class ScrubTest
.compressionParameters(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)),
SchemaLoader.standardCFMD(KEYSPACE, CF_UUID, 0, UUIDType.instance),
SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1, true),
- SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true));
+ SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true),
+ SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1_BYTEORDERED, true).copy(ByteOrderedPartitioner.instance),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2_BYTEORDERED, true).copy(ByteOrderedPartitioner.instance));
}
@Test
@@ -306,7 +304,7 @@ public class ScrubTest
{
// This test assumes ByteOrderPartitioner to create out-of-order SSTable
IPartitioner oldPartitioner = DatabaseDescriptor.getPartitioner();
- DatabaseDescriptor.setPartitioner(new ByteOrderedPartitioner());
+ DatabaseDescriptor.setPartitionerUnsafe(new ByteOrderedPartitioner());
// Create out-of-order SSTable
File tempDir = File.createTempFile("ScrubTest.testScrubOutOfOrder", "").getParentFile();
@@ -380,7 +378,7 @@ public class ScrubTest
{
FileUtils.deleteRecursive(tempDataDir);
// reset partitioner
- DatabaseDescriptor.setPartitioner(oldPartitioner);
+ DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner);
}
}
@@ -394,9 +392,9 @@ public class ScrubTest
CompressionMetadata compData = CompressionMetadata.create(sstable.getFilename());
CompressionMetadata.Chunk chunk1 = compData.chunkFor(
- sstable.getPosition(PartitionPosition.ForKey.get(key1, sstable.partitioner), SSTableReader.Operator.EQ).position);
+ sstable.getPosition(PartitionPosition.ForKey.get(key1, sstable.getPartitioner()), SSTableReader.Operator.EQ).position);
CompressionMetadata.Chunk chunk2 = compData.chunkFor(
- sstable.getPosition(PartitionPosition.ForKey.get(key2, sstable.partitioner), SSTableReader.Operator.EQ).position);
+ sstable.getPosition(PartitionPosition.ForKey.get(key2, sstable.getPartitioner()), SSTableReader.Operator.EQ).position);
startPosition = Math.min(chunk1.offset, chunk2.offset);
endPosition = Math.max(chunk1.offset + chunk1.length, chunk2.offset + chunk2.length);
@@ -405,8 +403,8 @@ public class ScrubTest
}
else
{ // overwrite with garbage from key1 to key2
- long row0Start = sstable.getPosition(PartitionPosition.ForKey.get(key1, sstable.partitioner), SSTableReader.Operator.EQ).position;
- long row1Start = sstable.getPosition(PartitionPosition.ForKey.get(key2, sstable.partitioner), SSTableReader.Operator.EQ).position;
+ long row0Start = sstable.getPosition(PartitionPosition.ForKey.get(key1, sstable.getPartitioner()), SSTableReader.Operator.EQ).position;
+ long row1Start = sstable.getPosition(PartitionPosition.ForKey.get(key2, sstable.getPartitioner()), SSTableReader.Operator.EQ).position;
startPosition = Math.min(row0Start, row1Start);
endPosition = Math.max(row0Start, row1Start);
}
@@ -547,28 +545,24 @@ public class ScrubTest
{
//If the partitioner preserves the order then SecondaryIndex uses BytesType comparator,
// otherwise it uses LocalByPartitionerType
- setKeyComparator(BytesType.instance);
- testScrubIndex(CF_INDEX1, COL_INDEX, false, true);
+ testScrubIndex(CF_INDEX1_BYTEORDERED, COL_INDEX, false, true);
}
@Test /* CASSANDRA-5174 */
public void testScrubCompositeIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException
{
- setKeyComparator(BytesType.instance);
- testScrubIndex(CF_INDEX2, COL_INDEX, true, true);
+ testScrubIndex(CF_INDEX2_BYTEORDERED, COL_INDEX, true, true);
}
@Test /* CASSANDRA-5174 */
public void testScrubKeysIndex() throws IOException, ExecutionException, InterruptedException
{
- setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
testScrubIndex(CF_INDEX1, COL_INDEX, false, true);
}
@Test /* CASSANDRA-5174 */
public void testScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException
{
- setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner()));
testScrubIndex(CF_INDEX2, COL_INDEX, true, true);
}
@@ -590,33 +584,6 @@ public class ScrubTest
testScrubIndex(CF_INDEX1, COL_INDEX, false, true, true);
}
- /** The SecondaryIndex class is used for custom indexes so to avoid
- * making a public final field into a private field with getters
- * and setters, we resort to this hack in order to test it properly
- * since it can have two values which influence the scrubbing behavior.
- * @param comparator - the key comparator we want to test
- */
- private void setKeyComparator(AbstractType<?> comparator)
- {
- try
- {
- Field keyComparator = SecondaryIndex.class.getDeclaredField("keyComparator");
- keyComparator.setAccessible(true);
- int modifiers = keyComparator.getModifiers();
- Field modifierField = keyComparator.getClass().getDeclaredField("modifiers");
- modifiers = modifiers & ~Modifier.FINAL;
- modifierField.setAccessible(true);
- modifierField.setInt(keyComparator, modifiers);
-
- keyComparator.set(null, comparator);
- }
- catch (Exception ex)
- {
- fail("Failed to change key comparator in secondary index : " + ex.getMessage());
- ex.printStackTrace();
- }
- }
-
private void testScrubIndex(String cfName, String colName, boolean composite, boolean ... scrubs)
throws IOException, ExecutionException, InterruptedException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 2e659af..f460cb5 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -303,8 +303,8 @@ public class VerifyTest
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
// overwrite one row with garbage
- long row0Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("0"), sstable.partitioner), SSTableReader.Operator.EQ).position;
- long row1Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("1"), sstable.partitioner), SSTableReader.Operator.EQ).position;
+ long row0Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("0"), cfs.getPartitioner()), SSTableReader.Operator.EQ).position;
+ long row1Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("1"), cfs.getPartitioner()), SSTableReader.Operator.EQ).position;
long startPosition = row0Start < row1Start ? row0Start : row1Start;
long endPosition = row0Start < row1Start ? row1Start : row0Start;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index 1fc0c01..360c663 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -215,7 +215,7 @@ public class TTLExpiryTest
cfs.enableAutoCompaction(true);
assertEquals(1, cfs.getLiveSSTables().size());
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
- ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata), DataRange.allData(sstable.partitioner), false);
+ ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata), DataRange.allData(cfs.getPartitioner()), false);
assertTrue(scanner.hasNext());
while(scanner.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 2153567..44f4d30 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -145,7 +145,6 @@ public class RealTransactionsTest extends SchemaLoader
String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
- .withPartitioner(StorageService.getPartitioner())
.inDirectory(cfs.directories.getDirectoryForNewSSTables())
.forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
.using(String.format(query, cfs.keyspace.getName(), cfs.name))
@@ -178,7 +177,6 @@ public class RealTransactionsTest extends SchemaLoader
0,
0,
0,
- DatabaseDescriptor.getPartitioner(),
SerializationHeader.make(cfs.metadata, txn.originals()),
txn));
while (ci.hasNext())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
index 4105800..4339877 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -523,12 +522,11 @@ public class TransactionLogsTest extends AbstractTransactionalTest
SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
- .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header)
+ .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor,
components,
cfs.metadata,
- Murmur3Partitioner.instance,
dFile,
iFile,
MockSchema.indexSummary.sharedCopy(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index a1ea0e4..9b1fa01 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -58,8 +58,7 @@ public class BootStrapperTest
@BeforeClass
public static void setup() throws ConfigurationException
{
- oldPartitioner = DatabaseDescriptor.getPartitioner();
- DatabaseDescriptor.setPartitioner(Murmur3Partitioner.instance);
+ oldPartitioner = StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
SchemaLoader.startGossiper();
SchemaLoader.prepareServer();
SchemaLoader.schemaDefinition("BootStrapperTest");
@@ -68,7 +67,7 @@ public class BootStrapperTest
@AfterClass
public static void tearDown()
{
- DatabaseDescriptor.setPartitioner(oldPartitioner);
+ DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner);
}
@Test
@@ -87,12 +86,12 @@ public class BootStrapperTest
private RangeStreamer testSourceTargetComputation(String keyspaceName, int numOldNodes, int replicationFactor) throws UnknownHostException
{
StorageService ss = StorageService.instance;
+ TokenMetadata tmd = ss.getTokenMetadata();
generateFakeEndpoints(numOldNodes);
- Token myToken = StorageService.getPartitioner().getRandomToken();
+ Token myToken = tmd.partitioner.getRandomToken();
InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
- TokenMetadata tmd = ss.getTokenMetadata();
assertEquals(numOldNodes, tmd.sortedTokens().size());
RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore());
IFailureDetector mockFailureDetector = new IFailureDetector()
@@ -136,7 +135,7 @@ public class BootStrapperTest
private void generateFakeEndpoints(TokenMetadata tmd, int numOldNodes, int numVNodes) throws UnknownHostException
{
tmd.clearUnsafe();
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = tmd.partitioner;
for (int i = 1; i <= numOldNodes; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/dht/KeyCollisionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/KeyCollisionTest.java b/test/unit/org/apache/cassandra/dht/KeyCollisionTest.java
index 3cda1d3..ade6ec1 100644
--- a/test/unit/org/apache/cassandra/dht/KeyCollisionTest.java
+++ b/test/unit/org/apache/cassandra/dht/KeyCollisionTest.java
@@ -18,12 +18,7 @@
package org.apache.cassandra.dht;
import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Random;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -31,23 +26,17 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
/**
* Test cases where multiple keys collides, ie have the same token.
@@ -65,8 +54,7 @@ public class KeyCollisionTest
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
- oldPartitioner = DatabaseDescriptor.getPartitioner();
- DatabaseDescriptor.setPartitioner(LengthPartitioner.instance);
+ oldPartitioner = StorageService.instance.setPartitionerUnsafe(LengthPartitioner.instance);
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
@@ -76,7 +64,7 @@ public class KeyCollisionTest
@AfterClass
public static void tearDown()
{
- DatabaseDescriptor.setPartitioner(oldPartitioner);
+ DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner);
}
@Test
@@ -136,122 +124,4 @@ public class KeyCollisionTest
return 0;
}
}
-
- public static class LengthPartitioner implements IPartitioner
- {
- public static final BigInteger ZERO = new BigInteger("0");
- public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
-
- public static LengthPartitioner instance = new LengthPartitioner();
-
- public DecoratedKey decorateKey(ByteBuffer key)
- {
- return new BufferDecoratedKey(getToken(key), key);
- }
-
- public BigIntegerToken midpoint(Token ltoken, Token rtoken)
- {
- // the symbolic MINIMUM token should act as ZERO: the empty bit array
- BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token;
- BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token;
- Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127);
- // discard the remainder
- return new BigIntegerToken(midpair.left);
- }
-
- public BigIntegerToken getMinimumToken()
- {
- return MINIMUM;
- }
-
- public BigIntegerToken getRandomToken()
- {
- return new BigIntegerToken(BigInteger.valueOf(new Random().nextInt(15)));
- }
-
- private final Token.TokenFactory tokenFactory = new Token.TokenFactory() {
- public ByteBuffer toByteArray(Token token)
- {
- BigIntegerToken bigIntegerToken = (BigIntegerToken) token;
- return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
- }
-
- public Token fromByteArray(ByteBuffer bytes)
- {
- return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
- }
-
- public String toString(Token token)
- {
- BigIntegerToken bigIntegerToken = (BigIntegerToken) token;
- return bigIntegerToken.token.toString();
- }
-
- public Token fromString(String string)
- {
- return new BigIntegerToken(new BigInteger(string));
- }
-
- public void validate(String token) {}
- };
-
- public Token.TokenFactory getTokenFactory()
- {
- return tokenFactory;
- }
-
- public boolean preservesOrder()
- {
- return false;
- }
-
- public BigIntegerToken getToken(ByteBuffer key)
- {
- if (key.remaining() == 0)
- return MINIMUM;
- return new BigIntegerToken(BigInteger.valueOf(key.remaining()));
- }
-
- public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
- {
- // allTokens will contain the count and be returned, sorted_ranges is shorthand for token<->token math.
- Map<Token, Float> allTokens = new HashMap<Token, Float>();
- List<Range<Token>> sortedRanges = new ArrayList<Range<Token>>();
-
- // this initializes the counts to 0 and calcs the ranges in order.
- Token lastToken = sortedTokens.get(sortedTokens.size() - 1);
- for (Token node : sortedTokens)
- {
- allTokens.put(node, new Float(0.0));
- sortedRanges.add(new Range<Token>(lastToken, node));
- lastToken = node;
- }
-
- for (String ks : Schema.instance.getKeyspaces())
- {
- for (CFMetaData cfmd : Schema.instance.getTables(ks))
- {
- for (Range<Token> r : sortedRanges)
- {
- // Looping over every KS:CF:Range, get the splits size and add it to the count
- allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.cfName, r, 1).size());
- }
- }
- }
-
- // Sum every count up and divide count/total for the fractional ownership.
- Float total = new Float(0.0);
- for (Float f : allTokens.values())
- total += f;
- for (Map.Entry<Token, Float> row : allTokens.entrySet())
- allTokens.put(row.getKey(), row.getValue() / total);
-
- return allTokens;
- }
-
- public AbstractType<?> getTokenValidator()
- {
- return IntegerType.instance;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
new file mode 100644
index 0000000..40a6774
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
+import org.apache.cassandra.dht.KeyCollisionTest.BigIntegerToken;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+public class LengthPartitioner implements IPartitioner
+{
+ public static final BigInteger ZERO = new BigInteger("0");
+ public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
+
+ public static LengthPartitioner instance = new LengthPartitioner();
+
+ public DecoratedKey decorateKey(ByteBuffer key)
+ {
+ return new BufferDecoratedKey(getToken(key), key);
+ }
+
+ public BigIntegerToken midpoint(Token ltoken, Token rtoken)
+ {
+ // the symbolic MINIMUM token should act as ZERO: the empty bit array
+ BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token;
+ BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token;
+ Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127);
+ // discard the remainder
+ return new BigIntegerToken(midpair.left);
+ }
+
+ public BigIntegerToken getMinimumToken()
+ {
+ return MINIMUM;
+ }
+
+ public BigIntegerToken getRandomToken()
+ {
+ return new BigIntegerToken(BigInteger.valueOf(new Random().nextInt(15)));
+ }
+
+ private final Token.TokenFactory tokenFactory = new Token.TokenFactory() {
+ public ByteBuffer toByteArray(Token token)
+ {
+ BigIntegerToken bigIntegerToken = (BigIntegerToken) token;
+ return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
+ }
+
+ public Token fromByteArray(ByteBuffer bytes)
+ {
+ return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
+ }
+
+ public String toString(Token token)
+ {
+ BigIntegerToken bigIntegerToken = (BigIntegerToken) token;
+ return bigIntegerToken.token.toString();
+ }
+
+ public Token fromString(String string)
+ {
+ return new BigIntegerToken(new BigInteger(string));
+ }
+
+ public void validate(String token) {}
+ };
+
+ public Token.TokenFactory getTokenFactory()
+ {
+ return tokenFactory;
+ }
+
+ public boolean preservesOrder()
+ {
+ return false;
+ }
+
+ public BigIntegerToken getToken(ByteBuffer key)
+ {
+ if (key.remaining() == 0)
+ return MINIMUM;
+ return new BigIntegerToken(BigInteger.valueOf(key.remaining()));
+ }
+
+ public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
+ {
+ // allTokens will contain the count and be returned, sorted_ranges is shorthand for token<->token math.
+ Map<Token, Float> allTokens = new HashMap<Token, Float>();
+ List<Range<Token>> sortedRanges = new ArrayList<Range<Token>>();
+
+ // this initializes the counts to 0 and calcs the ranges in order.
+ Token lastToken = sortedTokens.get(sortedTokens.size() - 1);
+ for (Token node : sortedTokens)
+ {
+ allTokens.put(node, new Float(0.0));
+ sortedRanges.add(new Range<Token>(lastToken, node));
+ lastToken = node;
+ }
+
+ for (String ks : Schema.instance.getKeyspaces())
+ {
+ for (CFMetaData cfmd : Schema.instance.getTables(ks))
+ {
+ for (Range<Token> r : sortedRanges)
+ {
+ // Looping over every KS:CF:Range, get the splits size and add it to the count
+ allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.cfName, r, 1).size());
+ }
+ }
+ }
+
+ // Sum every count up and divide count/total for the fractional ownership.
+ Float total = new Float(0.0);
+ for (Float f : allTokens.values())
+ total += f;
+ for (Map.Entry<Token, Float> row : allTokens.entrySet())
+ allTokens.put(row.getKey(), row.getValue() / total);
+
+ return allTokens;
+ }
+
+ public AbstractType<?> getTokenValidator()
+ {
+ return IntegerType.instance;
+ }
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return new PartitionerDefinedOrder(this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
index 887c481..cb892a7 100644
--- a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
+++ b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
@@ -129,7 +129,7 @@ public abstract class PartitionerTestCase
{
// This call initializes StorageService, needed to populate the keyspaces.
// TODO: This points to potential problems in the initialization sequence. Should be solved by CASSANDRA-7837.
- StorageService.getPartitioner();
+ StorageService.instance.getKeyspaces();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index bab1ace..8b7ad1f 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -19,11 +19,14 @@
package org.apache.cassandra.gms;
import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+
import org.junit.Test;
import java.io.IOException;
@@ -73,7 +76,9 @@ public class SerializationsTest extends AbstractSerializationsTester
states.put(InetAddress.getByName("127.0.0.2"), Statics.EndpointSt);
GossipDigestAck ack = new GossipDigestAck(Statics.Digests, states);
GossipDigestAck2 ack2 = new GossipDigestAck2(states);
- GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", StorageService.getPartitioner().getClass().getCanonicalName(), Statics.Digests);
+ GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name",
+ StorageService.instance.getTokenMetadata().partitioner.getClass().getCanonicalName(),
+ Statics.Digests);
DataOutputStreamPlus out = getOutput("gms.Gossip.bin");
for (GossipDigest gd : Statics.Digests)
@@ -111,9 +116,10 @@ public class SerializationsTest extends AbstractSerializationsTester
{
private static HeartBeatState HeartbeatSt = new HeartBeatState(101, 201);
private static EndpointState EndpointSt = new EndpointState(HeartbeatSt);
- private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner());
+ private static IPartitioner partitioner = StorageService.instance.getTokenMetadata().partitioner;
+ private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(partitioner);
private static VersionedValue vv0 = vvFact.load(23d);
- private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(StorageService.getPartitioner().getRandomToken()));
+ private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(partitioner.getRandomToken()));
private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 08de62f..357298e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 7bc21ee..2e9768e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -26,19 +26,20 @@ import java.util.UUID;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
@@ -65,78 +66,79 @@ public class CQLSSTableWriterTest
@Test
public void testUnsortedWriter() throws Exception
{
- String KS = "cql_keyspace";
- String TABLE = "table1";
-
- File tempdir = Files.createTempDir();
- File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
- assert dataDir.mkdirs();
-
- String schema = "CREATE TABLE cql_keyspace.table1 ("
- + " k int PRIMARY KEY,"
- + " v1 text,"
- + " v2 int"
- + ")";
- String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
- CQLSSTableWriter writer = CQLSSTableWriter.builder()
- .inDirectory(dataDir)
- .forTable(schema)
- .withPartitioner(StorageService.getPartitioner())
- .using(insert).build();
-
- writer.addRow(0, "test1", 24);
- writer.addRow(1, "test2", 44);
- writer.addRow(2, "test3", 42);
- writer.addRow(ImmutableMap.<String, Object>of("k", 3, "v2", 12));
+ try (AutoCloseable switcher = Util.switchPartitioner(ByteOrderedPartitioner.instance))
+ {
+ String KS = "cql_keyspace";
+ String TABLE = "table1";
+
+ File tempdir = Files.createTempDir();
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ assert dataDir.mkdirs();
+
+ String schema = "CREATE TABLE cql_keyspace.table1 ("
+ + " k int PRIMARY KEY,"
+ + " v1 text,"
+ + " v2 int"
+ + ")";
+ String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .using(insert).build();
- writer.close();
+ writer.addRow(0, "test1", 24);
+ writer.addRow(1, "test2", 44);
+ writer.addRow(2, "test3", 42);
+ writer.addRow(ImmutableMap.<String, Object>of("k", 3, "v2", 12));
- SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
- {
- private String keyspace;
+ writer.close();
- public void init(String keyspace)
+ SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
- this.keyspace = keyspace;
- for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
- addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
- setPartitioner(StorageService.getPartitioner());
- }
+ private String keyspace;
- public CFMetaData getTableMetadata(String cfName)
- {
- return Schema.instance.getCFMetaData(keyspace, cfName);
- }
- }, new OutputHandler.SystemOutput(false, false));
+ public void init(String keyspace)
+ {
+ this.keyspace = keyspace;
+ for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
+ addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+ }
- loader.stream().get();
+ public CFMetaData getTableMetadata(String cfName)
+ {
+ return Schema.instance.getCFMetaData(keyspace, cfName);
+ }
+ }, new OutputHandler.SystemOutput(false, false));
- UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
- assertEquals(4, rs.size());
+ loader.stream().get();
- Iterator<UntypedResultSet.Row> iter = rs.iterator();
- UntypedResultSet.Row row;
+ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
+ assertEquals(4, rs.size());
- row = iter.next();
- assertEquals(0, row.getInt("k"));
- assertEquals("test1", row.getString("v1"));
- assertEquals(24, row.getInt("v2"));
+ Iterator<UntypedResultSet.Row> iter = rs.iterator();
+ UntypedResultSet.Row row;
- row = iter.next();
- assertEquals(1, row.getInt("k"));
- assertEquals("test2", row.getString("v1"));
- //assertFalse(row.has("v2"));
- assertEquals(44, row.getInt("v2"));
+ row = iter.next();
+ assertEquals(0, row.getInt("k"));
+ assertEquals("test1", row.getString("v1"));
+ assertEquals(24, row.getInt("v2"));
- row = iter.next();
- assertEquals(2, row.getInt("k"));
- assertEquals("test3", row.getString("v1"));
- assertEquals(42, row.getInt("v2"));
+ row = iter.next();
+ assertEquals(1, row.getInt("k"));
+ assertEquals("test2", row.getString("v1"));
+ //assertFalse(row.has("v2"));
+ assertEquals(44, row.getInt("v2"));
- row = iter.next();
- assertEquals(3, row.getInt("k"));
- assertEquals(null, row.getBytes("v1")); // Using getBytes because we know it won't NPE
- assertEquals(12, row.getInt("v2"));
+ row = iter.next();
+ assertEquals(2, row.getInt("k"));
+ assertEquals("test3", row.getString("v1"));
+ assertEquals(42, row.getInt("v2"));
+
+ row = iter.next();
+ assertEquals(3, row.getInt("k"));
+ assertEquals(null, row.getBytes("v1")); // Using getBytes because we know it won't NPE
+ assertEquals(12, row.getInt("v2"));
+ }
}
@Test
@@ -159,7 +161,6 @@ public class CQLSSTableWriterTest
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
- .withPartitioner(StorageService.getPartitioner())
.using(insert)
.withBufferSizeInMB(1)
.build();
@@ -195,7 +196,6 @@ public class CQLSSTableWriterTest
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(tempdir)
.forTable(schema)
- .withPartitioner(StorageService.instance.getPartitioner())
.using(insert)
.withBufferSizeInMB(1)
.build();
@@ -234,7 +234,6 @@ public class CQLSSTableWriterTest
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
- .withPartitioner(StorageService.instance.getPartitioner())
.using(insert).build();
try
@@ -289,7 +288,6 @@ public class CQLSSTableWriterTest
this.keyspace = keyspace;
for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
- setPartitioner(StorageService.getPartitioner());
}
public CFMetaData getTableMetadata(String cfName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 7442a22..baa6fad 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -26,7 +26,7 @@ import java.util.*;
import com.google.common.collect.Lists;
import org.junit.Test;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
@@ -39,11 +39,12 @@ import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.downsample;
import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.entriesAtSamplingLevel;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-
import static org.junit.Assert.*;
public class IndexSummaryTest
{
+ IPartitioner partitioner = Util.testPartitioner();
+
@Test
public void testGetKey()
{
@@ -82,7 +83,7 @@ public class IndexSummaryTest
dos.writeUTF("JUNK");
FileUtils.closeQuietly(dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
- IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false, 1, 1);
+ IndexSummary is = IndexSummary.serializer.deserialize(dis, partitioner, false, 1, 1);
for (int i = 0; i < 100; i++)
assertEquals(i, is.binarySearch(random.left.get(i)));
// read the junk
@@ -126,13 +127,13 @@ public class IndexSummaryTest
for (int i = 0; i < size; i++)
{
UUID uuid = UUID.randomUUID();
- DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
+ DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.bytes(uuid));
list.add(key);
}
Collections.sort(list);
for (int i = 0; i < size; i++)
builder.maybeAddEntry(list.get(i), i);
- IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
+ IndexSummary summary = builder.build(partitioner);
return Pair.create(list, summary);
}
catch (IOException e)
@@ -185,7 +186,7 @@ public class IndexSummaryTest
int downsamplingRound = 1;
for (int samplingLevel = BASE_SAMPLING_LEVEL - 1; samplingLevel >= 1; samplingLevel--)
{
- try (IndexSummary downsampled = downsample(original, samplingLevel, 128, DatabaseDescriptor.getPartitioner());)
+ try (IndexSummary downsampled = downsample(original, samplingLevel, 128, partitioner);)
{
assertEquals(entriesAtSamplingLevel(samplingLevel, original.getMaxNumberOfEntries()), downsampled.size());
@@ -210,7 +211,7 @@ public class IndexSummaryTest
downsamplingRound = 1;
for (int downsampleLevel = BASE_SAMPLING_LEVEL - 1; downsampleLevel >= 1; downsampleLevel--)
{
- IndexSummary downsampled = downsample(previous, downsampleLevel, 128, DatabaseDescriptor.getPartitioner());
+ IndexSummary downsampled = downsample(previous, downsampleLevel, 128, partitioner);
if (previous != original)
previous.close();
assertEquals(entriesAtSamplingLevel(downsampleLevel, original.getMaxNumberOfEntries()), downsampled.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 7d97ec0..d2922cc 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -136,7 +136,7 @@ public class LegacySSTableTest
private void testStreaming(String version) throws Exception
{
SSTableReader sstable = SSTableReader.open(getDescriptor(version));
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = sstable.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 782f7fd..dfd7821 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -86,7 +86,6 @@ public class SSTableLoaderTest
this.keyspace = keyspace;
for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
- setPartitioner(StorageService.getPartitioner());
}
public CFMetaData getTableMetadata(String tableName)
@@ -107,7 +106,6 @@ public class SSTableLoaderTest
try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
- .withPartitioner(StorageService.getPartitioner())
.forTable(String.format(schema, KEYSPACE1, CF_STANDARD1))
.using(String.format(query, KEYSPACE1, CF_STANDARD1))
.build())
@@ -141,7 +139,6 @@ public class SSTableLoaderTest
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
- .withPartitioner(StorageService.getPartitioner())
.forTable(String.format(schema, KEYSPACE1, CF_STANDARD2))
.using(String.format(query, KEYSPACE1, CF_STANDARD2))
.withBufferSizeInMB(1)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 651ed8d..2fe5ef2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -52,17 +52,14 @@ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -79,9 +76,11 @@ public class SSTableReaderTest
public static final String CF_INDEXED = "Indexed1";
public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
- static Token t(int i)
+ private IPartitioner partitioner;
+
+ Token t(int i)
{
- return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i)));
+ return partitioner.getToken(ByteBufferUtil.bytes(String.valueOf(i)));
}
@BeforeClass
@@ -104,6 +103,7 @@ public class SSTableReaderTest
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
// insert data and compact to a single sstable
CompactionManager.instance.disableAutoCompaction();
@@ -124,7 +124,7 @@ public class SSTableReaderTest
// 2 keys
ranges.add(new Range<>(t(2), t(4)));
// wrapping range from key to end
- ranges.add(new Range<>(t(6), StorageService.getPartitioner().getMinimumToken()));
+ ranges.add(new Range<>(t(6), partitioner.getMinimumToken()));
// empty range (should be ignored)
ranges.add(new Range<>(t(9), t(91)));
@@ -146,6 +146,7 @@ public class SSTableReaderTest
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ partitioner = store.getPartitioner();
// insert a bunch of data and compact to a single sstable
CompactionManager.instance.disableAutoCompaction();
@@ -166,7 +167,7 @@ public class SSTableReaderTest
{
DecoratedKey dk = Util.dk(String.valueOf(j));
FileDataInput file = sstable.getFileDataInput(sstable.getPosition(dk, SSTableReader.Operator.EQ).position);
- DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
+ DecoratedKey keyInDisk = sstable.decorateKey(ByteBufferUtil.readWithShortLength(file));
assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath());
}
@@ -184,6 +185,7 @@ public class SSTableReaderTest
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ partitioner = store.getPartitioner();
for (int j = 0; j < 100; j += 2)
{
@@ -211,6 +213,7 @@ public class SSTableReaderTest
// try to make sure CASSANDRA-8239 never happens again
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ partitioner = store.getPartitioner();
for (int j = 0; j < 10; j++)
{
@@ -226,7 +229,7 @@ public class SSTableReaderTest
SSTableReader sstable = store.getLiveSSTables().iterator().next();
assertEquals(0, sstable.getReadMeter().count());
- DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.bytes("4"));
+ DecoratedKey key = sstable.decorateKey(ByteBufferUtil.bytes("4"));
Util.getAll(Util.cmd(store, key).build());
assertEquals(1, sstable.getReadMeter().count());
@@ -239,6 +242,7 @@ public class SSTableReaderTest
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
CacheService.instance.keyCache.setCapacity(100);
// insert data and compact to a single sstable
@@ -277,6 +281,7 @@ public class SSTableReaderTest
// Create secondary index and flush to disk
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_INDEXED);
+ partitioner = store.getPartitioner();
new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), "k1")
.clustering("0")
@@ -293,6 +298,7 @@ public class SSTableReaderTest
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
CacheService.instance.keyCache.setCapacity(1000);
// insert data and compact to a single sstable
@@ -383,7 +389,7 @@ public class SSTableReaderTest
store.forceBlockingFlush();
ColumnFamilyStore indexCfs = store.indexManager.getIndexForColumn(store.metadata.getColumnDefinition(bytes("birthdate"))).getIndexCfs();
- assert indexCfs.partitioner instanceof LocalPartitioner;
+ assert indexCfs.isIndex();
SSTableReader sstable = indexCfs.getLiveSSTables().iterator().next();
assert sstable.first.getToken() instanceof LocalToken;
@@ -403,6 +409,7 @@ public class SSTableReaderTest
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ partitioner = store.getPartitioner();
new RowUpdateBuilder(store.metadata, 0, "k1")
.clustering("xyz")
@@ -428,6 +435,7 @@ public class SSTableReaderTest
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard2");
+ partitioner = store.getPartitioner();
// insert data and compact to a single sstable. The
// number of keys inserted is greater than index_interval
@@ -459,7 +467,7 @@ public class SSTableReaderTest
Set<Component> components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
if (sstable.components.contains(Component.COMPRESSION_INFO))
components.add(Component.COMPRESSION_INFO);
- SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata, sstable.partitioner);
+ SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, store.metadata);
sections = bulkLoaded.getPositionsForRanges(ranges);
assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
bulkLoaded.selfRef().release();
@@ -510,7 +518,7 @@ public class SSTableReaderTest
public void run()
{
Iterable<DecoratedKey> results = store.keySamples(
- new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key)));
+ new Range<>(sstable.getPartitioner().getMinimumToken(), sstable.getPartitioner().getToken(key)));
assertTrue(results.iterator().hasNext());
}
}));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index cb07d37..fd22941 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -216,7 +216,7 @@ public class SSTableRewriterTest extends SchemaLoader
if (sstable.openReason == SSTableReader.OpenReason.EARLY)
{
SSTableReader c = txn.current(sstables.iterator().next());
- Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken()));
+ Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.getPartitioner().getMinimumToken(), cfs.getPartitioner().getMinimumToken()));
List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r);
List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r);
assertEquals(1, tmplinkPositions.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index e18dc1d..d77daf0 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -26,6 +26,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -40,7 +41,6 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -55,7 +55,7 @@ public class ValidatorTest
{
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
- private final IPartitioner partitioner = StorageService.getPartitioner();
+ private static IPartitioner partitioner;
@BeforeClass
public static void defineSchema() throws Exception
@@ -64,6 +64,7 @@ public class ValidatorTest
SchemaLoader.createKeyspace(keyspace,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(keyspace, columnFamily));
+ partitioner = Schema.instance.getCFMetaData(keyspace, columnFamily).partitioner;
}
@After
@@ -81,7 +82,6 @@ public class ValidatorTest
final SimpleCondition lock = new SimpleCondition();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- @SuppressWarnings("unchecked")
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
try
@@ -113,7 +113,7 @@ public class ValidatorTest
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
Validator validator = new Validator(desc, remote, 0);
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
+ MerkleTree tree = new MerkleTree(partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
validator.prepare(cfs, tree);
// and confirm that the tree was split
@@ -142,7 +142,6 @@ public class ValidatorTest
final SimpleCondition lock = new SimpleCondition();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- @SuppressWarnings("unchecked")
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 75b99d4..a61a33e 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -77,8 +77,8 @@ public class ActiveRepairServiceTest
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
tmd.clearUnsafe();
- StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
- tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
+ StorageService.instance.setTokens(Collections.singleton(tmd.partitioner.getRandomToken()));
+ tmd.updateNormalToken(tmd.partitioner.getMinimumToken(), REMOTE);
assert tmd.isMember(REMOTE);
}
@@ -208,7 +208,7 @@ public class ActiveRepairServiceTest
for (int i = 1; i <= max; i++)
{
InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
- tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint);
+ tmd.updateNormalToken(tmd.partitioner.getRandomToken(), endpoint);
endpoints.add(endpoint);
}
return endpoints;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index e5b2599..b2bb081 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -25,12 +25,14 @@ import java.util.*;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.Util.PartitionerSwitcher;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
@@ -50,7 +52,7 @@ import static org.junit.Assert.*;
public class LeaveAndBootstrapTest
{
private static final IPartitioner partitioner = RandomPartitioner.instance;
- private static IPartitioner oldPartitioner;
+ private static PartitionerSwitcher partitionerSwitcher;
private static final String KEYSPACE1 = "LeaveAndBootstrapTestKeyspace1";
private static final String KEYSPACE2 = "LeaveAndBootstrapTestKeyspace2";
private static final String KEYSPACE3 = "LeaveAndBootstrapTestKeyspace3";
@@ -59,7 +61,7 @@ public class LeaveAndBootstrapTest
@BeforeClass
public static void defineSchema() throws Exception
{
- oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
+ partitionerSwitcher = Util.switchPartitioner(partitioner);
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition("LeaveAndBootstrapTest");
}
@@ -67,7 +69,7 @@ public class LeaveAndBootstrapTest
@AfterClass
public static void tearDown()
{
- StorageService.instance.setPartitionerUnsafe(oldPartitioner);
+ partitionerSwitcher.close();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index b7af1be..80bb452 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -18,19 +18,22 @@
*/
package org.apache.cassandra.service;
-import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.UUID;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+
import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.Util.PartitionerSwitcher;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.MessageIn;
@@ -44,14 +47,26 @@ import org.apache.cassandra.utils.MerkleTree;
public class SerializationsTest extends AbstractSerializationsTester
{
- static
+ private static PartitionerSwitcher partitionerSwitcher;
+ private static UUID RANDOM_UUID;
+ private static Range<Token> FULL_RANGE;
+ private static RepairJobDesc DESC;
+
+ @BeforeClass
+ public static void defineSchema() throws Exception
+ {
+ partitionerSwitcher = Util.switchPartitioner(RandomPartitioner.instance);
+ RANDOM_UUID = UUID.fromString("b5c3d033-75aa-4c2f-a819-947aac7a0c54");
+ FULL_RANGE = new Range<>(Util.testPartitioner().getMinimumToken(), Util.testPartitioner().getMinimumToken());
+ DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE);
+ }
+
+ @AfterClass
+ public static void tearDown()
{
- System.setProperty("cassandra.partitioner", "RandomPartitioner");
+ partitionerSwitcher.close();
}
- private static final UUID RANDOM_UUID = UUID.fromString("b5c3d033-75aa-4c2f-a819-947aac7a0c54");
- private static final Range<Token> FULL_RANGE = new Range<>(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
- private static final RepairJobDesc DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE);
private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
index c996d5c..801fc53 100644
--- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
@@ -57,12 +57,12 @@ public class StorageProxyTest
private static PartitionPosition startOf(String key)
{
- return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(key)).minKeyBound();
+ return token(key).minKeyBound();
}
private static PartitionPosition endOf(String key)
{
- return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(key)).maxKeyBound();
+ return token(key).maxKeyBound();
}
private static Range<Token> tokenRange(String left, String right)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index cb084a0..85090dc 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -128,7 +128,7 @@ public class StreamingTransferTest
public void testRequestEmpty() throws Exception
{
// requesting empty data should succeed
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = Util.testPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
@@ -215,7 +215,7 @@ public class StreamingTransferTest
private void transferSSTables(SSTableReader sstable) throws Exception
{
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = sstable.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
@@ -224,7 +224,7 @@ public class StreamingTransferTest
private void transferRanges(ColumnFamilyStore cfs) throws Exception
{
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = cfs.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
// wrapped range
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
@@ -418,7 +418,7 @@ public class StreamingTransferTest
SSTableReader sstable2 = SSTableUtils.prepare().write(content);
// transfer the first and last key
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = Util.testPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken()));
@@ -447,7 +447,7 @@ public class StreamingTransferTest
public void testTransferOfMultipleColumnFamilies() throws Exception
{
String keyspace = KEYSPACE_CACHEKEY;
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = Util.testPartitioner();
String[] columnFamilies = new String[] { "Standard1", "Standard2", "Standard3" };
List<SSTableReader> ssTableReaders = new ArrayList<>();
@@ -517,7 +517,7 @@ public class StreamingTransferTest
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
cfs.clearUnsafe();
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = Util.testPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key1000"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key5")), p.getToken(ByteBufferUtil.bytes("key500"))));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index edb1fb1..deb401b 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -68,7 +68,7 @@ public class MerkleTreeTest
TOKEN_SCALE = new BigInteger("8");
partitioner = RandomPartitioner.instance;
// TODO need to trickle TokenSerializer
- DatabaseDescriptor.setPartitioner(partitioner);
+ DatabaseDescriptor.setPartitionerUnsafe(partitioner);
mt = new MerkleTree(partitioner, fullRange(), RECOMMENDED_DEPTH, Integer.MAX_VALUE);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index c50f400..cf50769 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -23,13 +23,14 @@ import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
+
import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.Util;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import java.io.File;
@@ -37,10 +38,9 @@ import java.io.FileInputStream;
public class SerializationsTest extends AbstractSerializationsTester
{
-
private static void testBloomFilterWrite(boolean offheap, boolean oldBfHashOrder) throws IOException
{
- IPartitioner partitioner = StorageService.getPartitioner();
+ IPartitioner partitioner = Util.testPartitioner();
try (IFilter bf = FilterFactory.getFilter(1000000, 0.0001, offheap, oldBfHashOrder))
{
for (int i = 0; i < 100; i++)
@@ -54,11 +54,10 @@ public class SerializationsTest extends AbstractSerializationsTester
private static void testBloomFilterWrite1000(boolean offheap, boolean oldBfHashOrder) throws IOException
{
- IPartitioner partitioner = StorageService.getPartitioner();
try (IFilter bf = FilterFactory.getFilter(1000000, 0.0001, offheap, oldBfHashOrder))
{
for (int i = 0; i < 1000; i++)
- bf.add(partitioner.decorateKey(Int32Type.instance.decompose(i)));
+ bf.add(Util.dk(Int32Type.instance.decompose(i)));
try (DataOutputStreamPlus out = getOutput(oldBfHashOrder ? "2.1" : "3.0", "utils.BloomFilter1000.bin"))
{
FilterFactory.serialize(bf, out);
@@ -75,19 +74,18 @@ public class SerializationsTest extends AbstractSerializationsTester
testBloomFilterWrite1000(true, true);
}
- IPartitioner partitioner = StorageService.getPartitioner();
try (DataInputStream in = getInput("3.0", "utils.BloomFilter1000.bin");
IFilter filter = FilterFactory.deserialize(in, true, false))
{
boolean present;
for (int i = 0 ; i < 1000 ; i++)
{
- present = filter.isPresent(partitioner.decorateKey(Int32Type.instance.decompose(i)));
+ present = filter.isPresent(Util.dk(Int32Type.instance.decompose(i)));
Assert.assertTrue(present);
}
for (int i = 1000 ; i < 2000 ; i++)
{
- present = filter.isPresent(partitioner.decorateKey(Int32Type.instance.decompose(i)));
+ present = filter.isPresent(Util.dk(Int32Type.instance.decompose(i)));
Assert.assertFalse(present);
}
}
@@ -98,12 +96,12 @@ public class SerializationsTest extends AbstractSerializationsTester
boolean present;
for (int i = 0 ; i < 1000 ; i++)
{
- present = filter.isPresent(partitioner.decorateKey(Int32Type.instance.decompose(i)));
+ present = filter.isPresent(Util.dk(Int32Type.instance.decompose(i)));
Assert.assertTrue(present);
}
for (int i = 1000 ; i < 2000 ; i++)
{
- present = filter.isPresent(partitioner.decorateKey(Int32Type.instance.decompose(i)));
+ present = filter.isPresent(Util.dk(Int32Type.instance.decompose(i)));
Assert.assertFalse(present);
}
}
@@ -117,13 +115,13 @@ public class SerializationsTest extends AbstractSerializationsTester
boolean present;
for (int i = 0 ; i < 1000 ; i++)
{
- present = filter.isPresent(partitioner.decorateKey(Int32Type.instance.decompose(i)));
+ present = filter.isPresent(Util.dk(Int32Type.instance.decompose(i)));
if (!present)
falseNegative ++;
}
for (int i = 1000 ; i < 2000 ; i++)
{
- present = filter.isPresent(partitioner.decorateKey(Int32Type.instance.decompose(i)));
+ present = filter.isPresent(Util.dk(Int32Type.instance.decompose(i)));
if (present)
falsePositive ++;
}
[4/4] cassandra git commit: Revert "Revert "Stop accessing the
partitioner directly via StorageService""
Posted by al...@apache.org.
Revert "Revert "Stop accessing the partitioner directly via StorageService""
This reverts commit a22ce89e868644ea04f0f3dacec05fff1673a345.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a08525a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a08525a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a08525a
Branch: refs/heads/trunk
Commit: 0a08525ad236f78df05c854dead62f300eae271d
Parents: 43139f8
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sat Aug 1 01:33:20 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sat Aug 1 01:33:51 2015 +0300
----------------------------------------------------------------------
.../org/apache/cassandra/config/CFMetaData.java | 64 +++++++-
.../cassandra/config/DatabaseDescriptor.java | 7 +-
.../org/apache/cassandra/config/Schema.java | 2 +-
.../apache/cassandra/cql3/TokenRelation.java | 7 +-
.../cassandra/cql3/functions/TokenFct.java | 10 +-
.../restrictions/StatementRestrictions.java | 2 +-
.../cql3/restrictions/TokenFilter.java | 14 +-
.../cql3/restrictions/TokenRestriction.java | 23 +--
.../cql3/statements/BatchStatement.java | 5 +-
.../cql3/statements/ModificationStatement.java | 7 +-
.../cql3/statements/SelectStatement.java | 2 +-
.../db/AbstractReadCommandBuilder.java | 11 +-
.../apache/cassandra/db/BatchlogManager.java | 7 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 35 ++--
src/java/org/apache/cassandra/db/DataRange.java | 3 +-
.../cassandra/db/HintedHandOffManager.java | 29 ++--
src/java/org/apache/cassandra/db/Memtable.java | 1 -
src/java/org/apache/cassandra/db/Mutation.java | 28 ++--
.../apache/cassandra/db/PartitionPosition.java | 2 +-
.../cassandra/db/PartitionRangeReadCommand.java | 2 +-
.../apache/cassandra/db/RowUpdateBuilder.java | 2 +-
.../db/SinglePartitionNamesCommand.java | 12 ++
.../db/SinglePartitionReadCommand.java | 18 ++-
.../db/SinglePartitionSliceCommand.java | 17 ++
.../org/apache/cassandra/db/SystemKeyspace.java | 18 +--
.../db/compaction/CompactionManager.java | 4 +-
.../db/compaction/LeveledManifest.java | 2 +-
.../cassandra/db/compaction/Scrubber.java | 4 +-
.../cassandra/db/compaction/Upgrader.java | 1 -
.../cassandra/db/compaction/Verifier.java | 2 +-
.../writers/DefaultCompactionWriter.java | 1 -
.../writers/MajorLeveledCompactionWriter.java | 2 -
.../writers/MaxSSTableSizeWriter.java | 2 -
.../SplittingSizeTieredCompactionWriter.java | 2 -
.../AbstractSimplePerColumnSecondaryIndex.java | 4 +-
.../cassandra/db/index/SecondaryIndex.java | 20 +--
.../db/index/composites/CompositesIndex.java | 2 +-
.../CompositesIndexOnClusteringKey.java | 3 +-
.../db/index/composites/CompositesSearcher.java | 2 +-
.../cassandra/db/index/keys/KeysIndex.java | 3 +-
.../cassandra/db/index/keys/KeysSearcher.java | 2 +-
.../db/marshal/LocalByPartionerType.java | 97 ------------
.../db/marshal/PartitionerDefinedOrder.java | 91 +++++++++++
.../db/partitions/AtomicBTreePartition.java | 4 +-
.../db/partitions/PartitionUpdate.java | 108 ++++++++++---
.../rows/UnfilteredRowIteratorSerializer.java | 3 +-
.../cassandra/db/view/MaterializedView.java | 7 +-
.../apache/cassandra/db/view/TemporalRow.java | 2 +-
.../org/apache/cassandra/dht/BootStrapper.java | 12 +-
.../cassandra/dht/ByteOrderedPartitioner.java | 5 +
.../org/apache/cassandra/dht/IPartitioner.java | 6 +
.../apache/cassandra/dht/LocalPartitioner.java | 5 +
.../cassandra/dht/Murmur3Partitioner.java | 7 +
.../dht/OrderPreservingPartitioner.java | 5 +
.../apache/cassandra/dht/RandomPartitioner.java | 7 +
.../org/apache/cassandra/dht/RangeStreamer.java | 2 +-
.../dht/tokenallocator/TokenAllocation.java | 8 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 2 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 10 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 15 +-
.../cassandra/io/sstable/KeyIterator.java | 8 +-
.../io/sstable/ReducingKeyIterator.java | 2 +-
.../apache/cassandra/io/sstable/SSTable.java | 21 ++-
.../cassandra/io/sstable/SSTableLoader.java | 20 +--
.../io/sstable/SSTableSimpleUnsortedWriter.java | 5 +-
.../io/sstable/SSTableSimpleWriter.java | 4 +-
.../io/sstable/format/SSTableReader.java | 74 ++++-----
.../io/sstable/format/SSTableWriter.java | 16 +-
.../io/sstable/format/big/BigFormat.java | 8 +-
.../io/sstable/format/big/BigTableReader.java | 8 +-
.../io/sstable/format/big/BigTableScanner.java | 6 +-
.../io/sstable/format/big/BigTableWriter.java | 15 +-
.../apache/cassandra/locator/TokenMetadata.java | 32 +++-
.../apache/cassandra/net/MessagingService.java | 6 +-
.../repair/RepairMessageVerbHandler.java | 4 +-
.../cassandra/schema/LegacySchemaMigrator.java | 16 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 33 ++--
.../apache/cassandra/service/CacheService.java | 4 +-
.../apache/cassandra/service/StorageProxy.java | 11 +-
.../cassandra/service/StorageService.java | 57 ++++---
.../service/pager/RangeNamesQueryPager.java | 4 +-
.../service/pager/RangeSliceQueryPager.java | 3 +-
.../apache/cassandra/service/paxos/Commit.java | 5 +-
.../cassandra/streaming/StreamReader.java | 2 +-
.../cassandra/thrift/CassandraServer.java | 48 +++---
.../cassandra/thrift/ThriftConversion.java | 4 +-
.../cassandra/thrift/ThriftValidation.java | 3 +-
.../utils/NativeSSTableLoaderClient.java | 17 +-
.../io/sstable/CQLSSTableWriterLongTest.java | 1 -
test/unit/org/apache/cassandra/MockSchema.java | 9 +-
.../org/apache/cassandra/UpdateBuilder.java | 2 +-
test/unit/org/apache/cassandra/Util.java | 49 ++++--
.../apache/cassandra/config/CFMetaDataTest.java | 1 -
.../cassandra/cql3/IndexQueryPagingTest.java | 3 -
.../selection/SelectionColumnMappingTest.java | 2 +-
.../entities/FrozenCollectionsTest.java | 5 +-
.../cql3/validation/entities/JsonTest.java | 2 +-
.../SecondaryIndexOnMapEntriesTest.java | 2 +-
.../cql3/validation/entities/UserTypesTest.java | 5 +-
.../validation/operations/SelectLimitTest.java | 2 +-
.../SelectOrderedPartitionerTest.java | 2 +-
.../cassandra/db/BatchlogManagerTest.java | 7 +-
.../org/apache/cassandra/db/RowCacheTest.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 63 ++------
.../org/apache/cassandra/db/VerifyTest.java | 4 +-
.../cassandra/db/compaction/TTLExpiryTest.java | 2 +-
.../db/lifecycle/RealTransactionsTest.java | 2 -
.../db/lifecycle/TransactionLogsTest.java | 4 +-
.../apache/cassandra/dht/BootStrapperTest.java | 11 +-
.../apache/cassandra/dht/KeyCollisionTest.java | 134 +---------------
.../apache/cassandra/dht/LengthPartitioner.java | 158 +++++++++++++++++++
.../cassandra/dht/PartitionerTestCase.java | 2 +-
.../cassandra/gms/SerializationsTest.java | 12 +-
.../io/sstable/BigTableWriterTest.java | 1 -
.../io/sstable/CQLSSTableWriterTest.java | 130 ++++++++-------
.../cassandra/io/sstable/IndexSummaryTest.java | 15 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../cassandra/io/sstable/SSTableLoaderTest.java | 3 -
.../cassandra/io/sstable/SSTableReaderTest.java | 32 ++--
.../io/sstable/SSTableRewriterTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 9 +-
.../service/ActiveRepairServiceTest.java | 6 +-
.../service/LeaveAndBootstrapTest.java | 8 +-
.../cassandra/service/SerializationsTest.java | 29 +++-
.../cassandra/service/StorageProxyTest.java | 4 +-
.../streaming/StreamingTransferTest.java | 12 +-
.../apache/cassandra/utils/MerkleTreeTest.java | 2 +-
.../cassandra/utils/SerializationsTest.java | 24 ++-
128 files changed, 1105 insertions(+), 878 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 902b1d2..ffb7b5e 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.LZ4Compressor;
@@ -183,7 +184,10 @@ public final class CFMetaData
private final boolean isCounter;
private final boolean isMaterializedView;
+ private final boolean isIndex;
+
public volatile ClusteringComparator comparator; // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns
+ public final IPartitioner partitioner; // partitioner the table uses
private final Serializers serializers;
@@ -259,7 +263,8 @@ public final class CFMetaData
boolean isMaterializedView,
List<ColumnDefinition> partitionKeyColumns,
List<ColumnDefinition> clusteringColumns,
- PartitionColumns partitionColumns)
+ PartitionColumns partitionColumns,
+ IPartitioner partitioner)
{
this.cfId = cfId;
this.ksName = keyspace;
@@ -284,6 +289,11 @@ public final class CFMetaData
flags.add(Flag.MATERIALIZEDVIEW);
this.flags = Sets.immutableEnumSet(flags);
+ isIndex = cfName.contains(".");
+
+ assert partitioner != null;
+ this.partitioner = partitioner;
+
// A compact table should always have a clustering
assert isCQLTable() || !clusteringColumns.isEmpty() : String.format("For table %s.%s, isDense=%b, isCompound=%b, clustering=%s", ksName, cfName, isDense, isCompound, clusteringColumns);
@@ -329,7 +339,8 @@ public final class CFMetaData
boolean isSuper,
boolean isCounter,
boolean isMaterializedView,
- List<ColumnDefinition> columns)
+ List<ColumnDefinition> columns,
+ IPartitioner partitioner)
{
List<ColumnDefinition> partitions = new ArrayList<>();
List<ColumnDefinition> clusterings = new ArrayList<>();
@@ -364,7 +375,8 @@ public final class CFMetaData
isMaterializedView,
partitions,
clusterings,
- builder.build());
+ builder.build(),
+ partitioner);
}
private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns)
@@ -466,7 +478,25 @@ public final class CFMetaData
isMaterializedView(),
copy(partitionKeyColumns),
copy(clusteringColumns),
- copy(partitionColumns)),
+ copy(partitionColumns),
+ partitioner),
+ this);
+ }
+
+ public CFMetaData copy(IPartitioner partitioner)
+ {
+ return copyOpts(new CFMetaData(ksName,
+ cfName,
+ cfId,
+ isSuper,
+ isCounter,
+ isDense,
+ isCompound,
+ isMaterializedView,
+ copy(partitionKeyColumns),
+ copy(clusteringColumns),
+ copy(partitionColumns),
+ partitioner),
this);
}
@@ -537,6 +567,19 @@ public final class CFMetaData
return cfName.contains(".");
}
+ /**
+ * true if this CFS contains secondary index data.
+ */
+ public boolean isIndex()
+ {
+ return isIndex;
+ }
+
+ public DecoratedKey decorateKey(ByteBuffer key)
+ {
+ return partitioner.decorateKey(key);
+ }
+
public Map<ByteBuffer, ColumnDefinition> getColumnMetadata()
{
return columnMetadata;
@@ -548,7 +591,7 @@ public final class CFMetaData
*/
public String getParentColumnFamilyName()
{
- return isSecondaryIndex() ? cfName.substring(0, cfName.indexOf('.')) : null;
+ return isIndex ? cfName.substring(0, cfName.indexOf('.')) : null;
}
public double getReadRepairChance()
@@ -1392,6 +1435,7 @@ public final class CFMetaData
private final boolean isSuper;
private final boolean isCounter;
private final boolean isMaterializedView;
+ private IPartitioner partitioner;
private UUID tableId;
@@ -1409,6 +1453,7 @@ public final class CFMetaData
this.isSuper = isSuper;
this.isCounter = isCounter;
this.isMaterializedView = isMaterializedView;
+ this.partitioner = DatabaseDescriptor.getPartitioner();
}
public static Builder create(String keyspace, String table)
@@ -1441,6 +1486,12 @@ public final class CFMetaData
return create(keyspace, table, false, false, true, isCounter);
}
+ public Builder withPartitioner(IPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ return this;
+ }
+
public Builder withId(UUID tableId)
{
this.tableId = tableId;
@@ -1554,7 +1605,8 @@ public final class CFMetaData
isMaterializedView,
partitions,
clusterings,
- builder.build());
+ builder.build(),
+ partitioner);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d32af4d..3ec21d7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.config;
import java.io.File;
-import java.io.IOException;
import java.net.*;
import java.util.*;
@@ -743,10 +742,12 @@ public class DatabaseDescriptor
return paritionerName;
}
- /* For tests ONLY, don't use otherwise or all hell will break loose */
- public static void setPartitioner(IPartitioner newPartitioner)
+ /* For tests ONLY, don't use otherwise or all hell will break loose. Tests should restore value at the end. */
+ public static IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
{
+ IPartitioner old = partitioner;
partitioner = newPartitioner;
+ return old;
}
public static IEndpointSnitch getEndpointSnitch()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index c934327..e1e7380 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -612,7 +612,7 @@ public class Schema
MigrationManager.instance.notifyDropAggregate(uda);
}
- private KeyspaceMetadata update(String keyspaceName, java.util.function.Function<KeyspaceMetadata, KeyspaceMetadata> transformation)
+ private synchronized KeyspaceMetadata update(String keyspaceName, java.util.function.Function<KeyspaceMetadata, KeyspaceMetadata> transformation)
{
KeyspaceMetadata current = getKSMetaData(keyspaceName);
if (current == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/TokenRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/TokenRelation.java b/src/java/org/apache/cassandra/cql3/TokenRelation.java
index 14bd5e0..e0b71fa 100644
--- a/src/java/org/apache/cassandra/cql3/TokenRelation.java
+++ b/src/java/org/apache/cassandra/cql3/TokenRelation.java
@@ -30,7 +30,6 @@ import org.apache.cassandra.cql3.restrictions.Restriction;
import org.apache.cassandra.cql3.restrictions.TokenRestriction;
import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsNoDuplicates;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsOnly;
@@ -69,7 +68,7 @@ public final class TokenRelation extends Relation
{
List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm);
Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames);
- return new TokenRestriction.EQRestriction(cfm.getKeyValidatorAsClusteringComparator(), columnDefs, term);
+ return new TokenRestriction.EQRestriction(cfm, columnDefs, term);
}
@Override
@@ -86,7 +85,7 @@ public final class TokenRelation extends Relation
{
List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm);
Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames);
- return new TokenRestriction.SliceRestriction(cfm.getKeyValidatorAsClusteringComparator(), columnDefs, bound, inclusive, term);
+ return new TokenRestriction.SliceRestriction(cfm, columnDefs, bound, inclusive, term);
}
@Override
@@ -159,6 +158,6 @@ public final class TokenRelation extends Relation
return Collections.singletonList(new ColumnSpecification(firstColumn.ksName,
firstColumn.cfName,
new ColumnIdentifier("partition key token", true),
- StorageService.getPartitioner().getTokenValidator()));
+ cfm.partitioner.getTokenValidator()));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index c76b588..283ac0b 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -22,23 +22,17 @@ import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.CBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.service.StorageService;
public class TokenFct extends NativeScalarFunction
{
- // The actual token function depends on the partitioner used
- private static final IPartitioner partitioner = StorageService.getPartitioner();
-
private final CFMetaData cfm;
public TokenFct(CFMetaData cfm)
{
- super("token", partitioner.getTokenValidator(), getKeyTypes(cfm));
+ super("token", cfm.partitioner.getTokenValidator(), getKeyTypes(cfm));
this.cfm = cfm;
}
@@ -61,6 +55,6 @@ public class TokenFct extends NativeScalarFunction
return null;
builder.add(bb);
}
- return partitioner.getTokenFactory().toByteArray(partitioner.getToken(CFMetaData.serializePartitionKey(builder.build())));
+ return cfm.partitioner.getTokenFactory().toByteArray(cfm.partitioner.getToken(CFMetaData.serializePartitionKey(builder.build())));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index d9fd5e4..ea87db7 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -378,7 +378,7 @@ public final class StatementRestrictions
*/
public AbstractBounds<PartitionPosition> getPartitionKeyBounds(QueryOptions options) throws InvalidRequestException
{
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = cfm.partitioner;
if (partitionKeyRestrictions.isOnToken())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index bf3f2f6..3258b26 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.cql3.statements.Bound.END;
import static org.apache.cassandra.cql3.statements.Bound.START;
@@ -52,9 +51,9 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions
private TokenRestriction tokenRestriction;
/**
- * The partitioner
+ * Partitioner to manage tokens, extracted from tokenRestriction metadata.
*/
- private static final IPartitioner partitioner = StorageService.getPartitioner();
+ private final IPartitioner partitioner;
@Override
protected PrimaryKeyRestrictions getDelegate()
@@ -74,6 +73,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions
{
this.restrictions = restrictions;
this.tokenRestriction = tokenRestriction;
+ this.partitioner = tokenRestriction.metadata.partitioner;
}
@Override
@@ -144,7 +144,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions
* @param values the restricted values
* @return the values for which the tokens are not included within the specified range.
*/
- private static List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values)
+ private List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values)
{
List<ByteBuffer> remaining = new ArrayList<>();
@@ -166,7 +166,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions
* @param buffers the token restriction values
* @return the range set corresponding to the specified list
*/
- private static RangeSet<Token> toRangeSet(List<ByteBuffer> buffers)
+ private RangeSet<Token> toRangeSet(List<ByteBuffer> buffers)
{
ImmutableRangeSet.Builder<Token> builder = ImmutableRangeSet.builder();
@@ -184,7 +184,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions
* @return the range set corresponding to the specified slice
* @throws InvalidRequestException if the request is invalid
*/
- private static RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException
+ private RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException
{
if (slice.hasBound(START))
{
@@ -224,7 +224,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions
* @param buffer the buffer
* @return the token corresponding to the specified buffer
*/
- private static Token deserializeToken(ByteBuffer buffer)
+ private Token deserializeToken(ByteBuffer buffer)
{
return partitioner.getTokenFactory().fromByteArray(buffer);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index 0a7721a..56da6da 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -22,6 +22,7 @@ import java.util.*;
import com.google.common.base.Joiner;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.Term;
@@ -44,16 +45,18 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
*/
protected final List<ColumnDefinition> columnDefs;
+ final CFMetaData metadata;
+
/**
* Creates a new <code>TokenRestriction</code> that apply to the specified columns.
*
- * @param comparator the clustering comparator
* @param columnDefs the definition of the columns to which apply the token restriction
*/
- public TokenRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs)
+ public TokenRestriction(CFMetaData metadata, List<ColumnDefinition> columnDefs)
{
- super(comparator);
+ super(metadata.getKeyValidatorAsClusteringComparator());
this.columnDefs = columnDefs;
+ this.metadata = metadata;
}
@Override
@@ -154,9 +157,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
{
private final Term value;
- public EQRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Term value)
+ public EQRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, Term value)
{
- super(comparator, columnDefs);
+ super(cfm, columnDefs);
this.value = value;
}
@@ -190,9 +193,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
{
private final TermSlice slice;
- public SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
+ public SliceRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
{
- super(comparator, columnDefs);
+ super(cfm, columnDefs);
slice = TermSlice.newInstance(bound, inclusive, term);
}
@@ -250,7 +253,7 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
throw invalidRequest("More than one restriction was found for the end bound on %s",
getColumnNamesAsString());
- return new SliceRestriction(comparator, columnDefs, slice.merge(otherSlice.slice));
+ return new SliceRestriction(metadata, columnDefs, slice.merge(otherSlice.slice));
}
@Override
@@ -258,9 +261,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
{
return String.format("SLICE%s", slice);
}
- private SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, TermSlice slice)
+ private SliceRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, TermSlice slice)
{
- super(comparator, columnDefs);
+ super(cfm, columnDefs);
this.slice = slice;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 08a47c0..5d1333c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -260,7 +259,7 @@ public class BatchStatement implements CQLStatement
for (ByteBuffer key : keys)
{
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ DecoratedKey dk = statement.cfm.decorateKey(key);
IMutation mutation = ksMap.get(dk.getKey());
Mutation mut;
if (mutation == null)
@@ -426,7 +425,7 @@ public class BatchStatement implements CQLStatement
throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
if (key == null)
{
- key = StorageService.getPartitioner().decorateKey(pks.get(0));
+ key = statement.cfm.decorateKey(pks.get(0));
casRequest = new CQL3CasRequest(statement.cfm, key, true, conditionColumns, updatesRegularRows, updatesStaticRow);
}
else if (!key.getKey().equals(pks.get(0)))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 2f3de4c..9f2c952 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -541,7 +541,7 @@ public abstract class ModificationStatement implements CQLStatement
ColumnFilter.selection(toRead),
RowFilter.NONE,
DataLimits.NONE,
- StorageService.getPartitioner().decorateKey(key),
+ key,
new ClusteringIndexNamesFilter(clusterings, false)));
Map<DecoratedKey, Partition> map = new HashMap();
@@ -639,7 +639,7 @@ public abstract class ModificationStatement implements CQLStatement
if (keys.size() > 1)
throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
- DecoratedKey key = StorageService.getPartitioner().decorateKey(keys.get(0));
+ DecoratedKey key = cfm.decorateKey(keys.get(0));
long now = options.getTimestamp(queryState);
CBuilder cbuilder = createClustering(options);
@@ -820,8 +820,7 @@ public abstract class ModificationStatement implements CQLStatement
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(cfm, key);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- PartitionUpdate upd = new PartitionUpdate(cfm, dk, updatedColumns(), 1);
+ PartitionUpdate upd = new PartitionUpdate(cfm, key, updatedColumns(), 1);
addUpdateForKey(upd, clustering, params);
Mutation mut = new Mutation(upd);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 84d621b..94f04b8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -410,7 +410,7 @@ public class SelectStatement implements CQLStatement
for (ByteBuffer key : keys)
{
QueryProcessor.validateKey(key);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBufferUtil.clone(key));
+ DecoratedKey dk = cfm.decorateKey(ByteBufferUtil.clone(key));
commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, limit, dk, filter));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 2ddc6ca..5e3b726 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -24,14 +24,9 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
public abstract class AbstractReadCommandBuilder
@@ -312,13 +307,13 @@ public abstract class AbstractReadCommandBuilder
PartitionPosition start = startKey;
if (start == null)
{
- start = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+ start = cfs.getPartitioner().getMinimumToken().maxKeyBound();
startInclusive = false;
}
PartitionPosition end = endKey;
if (end == null)
{
- end = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+ end = cfs.getPartitioner().getMinimumToken().maxKeyBound();
endInclusive = true;
}
@@ -341,7 +336,7 @@ public abstract class AbstractReadCommandBuilder
return (DecoratedKey)partitionKey[0];
ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
- return StorageService.getPartitioner().decorateKey(key);
+ return metadata.decorateKey(key);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 154a86b..9e90d9d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -199,8 +199,11 @@ public class BatchlogManager implements BatchlogManagerMBean
private void deleteBatch(UUID id)
{
- Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(id)));
- mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+ Mutation mutation = new Mutation(
+ PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog,
+ UUIDType.instance.decompose(id),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()));
mutation.apply();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 24da365..c4377d6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -137,7 +137,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final Keyspace keyspace;
public final String name;
public final CFMetaData metadata;
- public final IPartitioner partitioner;
private final String mbeanName;
@Deprecated
private final String oldMBeanName;
@@ -304,20 +303,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public ColumnFamilyStore(Keyspace keyspace,
String columnFamilyName,
- IPartitioner partitioner,
int generation,
CFMetaData metadata,
Directories directories,
boolean loadSSTables)
{
- this(keyspace, columnFamilyName, partitioner, generation, metadata, directories, loadSSTables, true);
+ this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true);
}
@VisibleForTesting
public ColumnFamilyStore(Keyspace keyspace,
String columnFamilyName,
- IPartitioner partitioner,
int generation,
CFMetaData metadata,
Directories directories,
@@ -331,7 +328,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
this.metadata = metadata;
this.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold());
this.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold());
- this.partitioner = partitioner;
this.directories = directories;
this.indexManager = new SecondaryIndexManager(this);
this.materializedViewManager = new MaterializedViewManager(this);
@@ -349,7 +345,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (data.loadsstables)
{
Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
- Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
+ Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata);
data.addInitialSSTables(sstables);
}
@@ -486,12 +482,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, boolean loadSSTables)
{
- return createColumnFamilyStore(keyspace, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(keyspace.getName(), columnFamily), loadSSTables);
+ return createColumnFamilyStore(keyspace, columnFamily, Schema.instance.getCFMetaData(keyspace.getName(), columnFamily), loadSSTables);
}
public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace,
String columnFamily,
- IPartitioner partitioner,
CFMetaData metadata,
boolean loadSSTables)
{
@@ -510,7 +505,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Collections.sort(generations);
int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
- return new ColumnFamilyStore(keyspace, columnFamily, partitioner, value, metadata, directories, loadSSTables);
+ return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, directories, loadSSTables);
}
/**
@@ -681,7 +676,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
SSTableReader reader;
try
{
- reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata, partitioner);
+ reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata);
}
catch (IOException e)
{
@@ -1443,7 +1438,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// WARNING: this returns the set of LIVE sstables only, which may be only partially written
public List<String> getSSTablesForKey(String key)
{
- DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key));
+ DecoratedKey dk = decorateKey(metadata.getKeyValidator().fromString(key));
try (OpOrder.Group op = readOrdering.start())
{
List<String> files = new ArrayList<>();
@@ -1489,7 +1484,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
keyIter.hasNext(); )
{
RowCacheKey key = keyIter.next();
- DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
+ DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key));
if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
invalidateCachedPartition(dk);
}
@@ -1500,7 +1495,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
keyIter.hasNext(); )
{
CounterCacheKey key = keyIter.next();
- DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
+ DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey));
if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
CacheService.instance.counterCache.remove(key);
}
@@ -1618,7 +1613,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (logger.isDebugEnabled())
logger.debug("using snapshot sstable {}", entries.getKey());
// open without tracking hotness
- sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner, true, false);
+ sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, true, false);
// This is technically not necessary since it's a snapshot but makes things easier
refs.tryRef(sstable);
}
@@ -2080,10 +2075,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return n;
}
+ public IPartitioner getPartitioner()
+ {
+ return metadata.partitioner;
+ }
+
+ public DecoratedKey decorateKey(ByteBuffer key)
+ {
+ return metadata.decorateKey(key);
+ }
+
/** true if this CFS contains secondary index data */
public boolean isIndex()
{
- return partitioner instanceof LocalPartitioner;
+ return metadata.isIndex();
}
public Iterable<ColumnFamilyStore> concatWithIndexes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 358b0ac..023f572 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
/**
* Groups both the range of partitions to query, and the clustering index filter to
@@ -374,7 +373,7 @@ public class DataRange
public DataRange deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
{
- AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
+ AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
if (in.readBoolean())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 6ff880c..73189a6 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -33,9 +33,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.ColumnDefinition;
@@ -46,7 +46,6 @@ import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
@@ -131,8 +130,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
UUID hintId = UUIDGen.getTimeUUID();
// serialize the hint with id and version as a composite column name
- DecoratedKey key = StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId));
-
+ ByteBuffer key = UUIDType.instance.decompose(targetId);
Clustering clustering = SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version);
ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value);
@@ -179,9 +177,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp)
{
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes);
Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds());
- PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, BTreeBackedRow.singleCellRow(clustering, cell));
+ PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, tokenBytes, BTreeBackedRow.singleCellRow(clustering, cell));
new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
@@ -204,8 +201,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
if (!StorageService.instance.getTokenMetadata().isMember(endpoint))
return;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(UUIDGen.decompose(hostId)));
- final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, dk, System.currentTimeMillis(), FBUtilities.nowInSeconds()));
+ ByteBuffer key = ByteBuffer.wrap(UUIDGen.decompose(hostId));
+ final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, key, System.currentTimeMillis(), FBUtilities.nowInSeconds()));
// execute asynchronously to avoid blocking caller (which may be processing gossip)
Runnable runnable = new Runnable()
@@ -368,7 +365,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
UUID hostId = Gossiper.instance.getHostId(endpoint);
logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint);
final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
- DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
final AtomicInteger rowsReplayed = new AtomicInteger(0);
@@ -380,7 +376,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group op = hintStore.readOrdering.start();
- RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, epkey).queryMemtableAndDisk(hintStore, op), nowInSec))
+ RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, hostIdBytes).queryMemtableAndDisk(hintStore, op), nowInSec))
{
List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
@@ -480,7 +476,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
ColumnFilter.all(hintStore.metadata),
RowFilter.NONE,
DataLimits.cqlLimits(Integer.MAX_VALUE, 1),
- DataRange.allData(StorageService.getPartitioner()));
+ DataRange.allData(hintStore.metadata.partitioner));
try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
{
@@ -546,12 +542,12 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
public List<String> listEndpointsPendingHints()
{
- Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory();
-
// Extract the keys as strings to be reported.
- LinkedList<String> result = new LinkedList<>();
+ List<String> result = new ArrayList<>();
+
ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds());
- try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup();
+ UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
{
while (iter.hasNext())
{
@@ -560,10 +556,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
// We don't delete by range on the hints table, so we don't have to worry about the
// iterator returning only range tombstone marker
if (partition.hasNext())
- result.addFirst(tokenFactory.toString(partition.partitionKey().getToken()));
+ result.add(UUIDType.instance.compose(partition.partitionKey().getKey()).toString());
}
}
}
+
return result;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index ecaf063..5ec9fe5 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -436,7 +436,6 @@ public class Memtable implements Comparable<Memtable>
(long)partitions.size(),
ActiveRepairService.UNREPAIRED_SSTABLE,
cfs.metadata,
- cfs.partitioner,
sstableMetadataCollector,
new SerializationHeader(cfs.metadata, columns, stats),
txn));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index ace114b..709c78f 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
@@ -31,7 +32,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,6 +107,7 @@ public class Mutation implements IMutation
public Mutation add(PartitionUpdate update)
{
assert update != null;
+ assert update.partitionKey().getPartitioner() == key.getPartitioner();
PartitionUpdate prev = modifications.put(update.metadata().cfId, update);
if (prev != null)
// developer error
@@ -270,15 +271,14 @@ public class Mutation implements IMutation
public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
{
- String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
if (version < MessagingService.VERSION_20)
- keyspaceName = in.readUTF();
+ in.readUTF(); // read pre-2.0 keyspace name
- DecoratedKey key = null;
+ ByteBuffer key = null;
int size;
if (version < MessagingService.VERSION_30)
{
- key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+ key = ByteBufferUtil.readWithShortLength(in);
size = in.readInt();
}
else
@@ -288,23 +288,21 @@ public class Mutation implements IMutation
assert size > 0;
+ PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, flag, key);
if (size == 1)
- return new Mutation(PartitionUpdate.serializer.deserialize(in, version, flag, key));
+ return new Mutation(update);
Map<UUID, PartitionUpdate> modifications = new HashMap<>(size);
- PartitionUpdate update = null;
- for (int i = 0; i < size; ++i)
+ DecoratedKey dk = update.partitionKey();
+
+ modifications.put(update.metadata().cfId, update);
+ for (int i = 1; i < size; ++i)
{
- update = PartitionUpdate.serializer.deserialize(in, version, flag, key);
+ update = PartitionUpdate.serializer.deserialize(in, version, flag, dk);
modifications.put(update.metadata().cfId, update);
}
- if (keyspaceName == null)
- keyspaceName = update.metadata().ksName;
- if (key == null)
- key = update.partitionKey();
-
- return new Mutation(keyspaceName, key, modifications);
+ return new Mutation(update.metadata().ksName, dk, modifications);
}
public Mutation deserialize(DataInputPlus in, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/PartitionPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java
index afb446d..ac5258d 100644
--- a/src/java/org/apache/cassandra/db/PartitionPosition.java
+++ b/src/java/org/apache/cassandra/db/PartitionPosition.java
@@ -84,7 +84,7 @@ public interface PartitionPosition extends RingPosition<PartitionPosition>
if (kind == Kind.ROW_KEY)
{
ByteBuffer k = ByteBufferUtil.readWithShortLength(in);
- return StorageService.getPartitioner().decorateKey(k);
+ return p.decorateKey(k);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index d48fca5..18b6950 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -90,7 +90,7 @@ public class PartitionRangeReadCommand extends ReadCommand
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
- DataRange.allData(StorageService.getPartitioner()));
+ DataRange.allData(metadata.partitioner));
}
public DataRange dataRange()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index c06a7f7..e4f05b0 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -223,7 +223,7 @@ public class RowUpdateBuilder
return (DecoratedKey)partitionKey[0];
ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
- return StorageService.getPartitioner().decorateKey(key);
+ return metadata.decorateKey(key);
}
private static PartitionUpdate getOrAdd(CFMetaData metadata, Mutation mutation)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index 5ffbd55..b0958fc 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db;
+import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Sets;
@@ -67,6 +68,17 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
}
+ public SinglePartitionNamesCommand(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ ByteBuffer key,
+ ClusteringIndexNamesFilter clusteringIndexFilter)
+ {
+ this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter);
+ }
+
public SinglePartitionNamesCommand copy()
{
return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 3d4e42e..6e9e2d5 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.cache.*;
@@ -57,6 +58,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
F clusteringIndexFilter)
{
super(Kind.SINGLE_PARTITION, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ assert partitionKey.getPartitioner() == metadata.partitioner;
this.partitionKey = partitionKey;
this.clusteringIndexFilter = clusteringIndexFilter;
}
@@ -145,6 +147,20 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
return SinglePartitionSliceCommand.create(metadata, nowInSec, key, Slices.ALL);
}
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
+ {
+ return SinglePartitionSliceCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+ }
+
public DecoratedKey partitionKey()
{
return partitionKey;
@@ -486,7 +502,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
throws IOException
{
- DecoratedKey key = StorageService.getPartitioner().decorateKey(metadata.getKeyValidator().readValue(in));
+ DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
if (filter instanceof ClusteringIndexNamesFilter)
return new SinglePartitionNamesCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index b4cbbd6..bb9a35e 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db;
+import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
@@ -97,6 +98,22 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
return new SinglePartitionSliceCommand(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
}
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
+ {
+ return create(metadata, nowInSec, metadata.decorateKey(key), slices);
+ }
+
public SinglePartitionSliceCommand copy()
{
return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index e31feaa..d17eaf7 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -451,11 +451,6 @@ public final class SystemKeyspace
DECOMMISSIONED
}
- private static DecoratedKey decorate(ByteBuffer key)
- {
- return StorageService.getPartitioner().decorateKey(key);
- }
-
public static void finishStartup()
{
persistLocalMetadata();
@@ -564,7 +559,7 @@ public final class SystemKeyspace
public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token)
{
String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
- Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory();
executeInternal(String.format(req, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
}
@@ -583,7 +578,7 @@ public final class SystemKeyspace
generation = row.getInt("generation_number");
if (row.has("last_key"))
{
- Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory();
lastKey = factory.fromString(row.getString("last_key"));
}
@@ -717,7 +712,9 @@ public final class SystemKeyspace
private static Set<String> tokensAsSet(Collection<Token> tokens)
{
- Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ if (tokens.isEmpty())
+ return Collections.emptySet();
+ Token.TokenFactory factory = StorageService.instance.getTokenFactory();
Set<String> s = new HashSet<>(tokens.size());
for (Token tk : tokens)
s.add(factory.toString(tk));
@@ -726,7 +723,7 @@ public final class SystemKeyspace
private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
{
- Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ Token.TokenFactory factory = StorageService.instance.getTokenFactory();
List<Token> tokens = new ArrayList<>(tokensStrings.size());
for (String tk : tokensStrings)
tokens.add(factory.fromString(tk));
@@ -1165,8 +1162,7 @@ public final class SystemKeyspace
public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates)
{
long timestamp = FBUtilities.timestampMicros();
- DecoratedKey key = decorate(UTF8Type.instance.decompose(keyspace));
- PartitionUpdate update = new PartitionUpdate(SizeEstimates, key, SizeEstimates.partitionColumns(), estimates.size());
+ PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.partitionColumns(), estimates.size());
Mutation mutation = new Mutation(update);
// delete all previous values with a single range tombstone.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3dd6f38..548c661 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -961,7 +961,6 @@ public class CompactionManager implements CompactionManagerMBean
expectedBloomFilterSize,
repairedAt,
sstable.getSSTableLevel(),
- cfs.partitioner,
sstable.header,
txn);
}
@@ -993,7 +992,6 @@ public class CompactionManager implements CompactionManagerMBean
(long) expectedBloomFilterSize,
repairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(sstables, cfs.metadata.comparator, minLevel),
SerializationHeader.make(cfs.metadata, sstables),
txn);
@@ -1085,7 +1083,7 @@ public class CompactionManager implements CompactionManagerMBean
}
// determine tree depth from number of partitions, but cap at 20 to prevent large tree.
int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+ MerkleTree tree = new MerkleTree(cfs.getPartitioner(), validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
long start = System.nanoTime();
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 0cee370..7fd5717 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -80,7 +80,7 @@ public class LeveledManifest
for (int i = 0; i < generations.length; i++)
{
generations[i] = new ArrayList<>();
- lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
+ lastCompactedKeys[i] = cfs.getPartitioner().getMinimumToken().minKeyBound();
}
compactionCounter = new int[MAX_LEVEL_COUNT];
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 81e307a..5b3f6c7 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -179,7 +179,7 @@ public class Scrubber implements Closeable
DecoratedKey key = null;
try
{
- key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
+ key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
}
catch (Throwable th)
{
@@ -249,7 +249,7 @@ public class Scrubber implements Closeable
{
outputHandler.output(String.format("Retrying from row index; data is %s bytes starting at %s",
dataSizeFromIndex, dataStartFromIndex));
- key = sstable.partitioner.decorateKey(currentIndexKey);
+ key = sstable.decorateKey(currentIndexKey);
try
{
dataFile.seek(dataStartFromIndex);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index be0dd2a..b8a102e 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -83,7 +83,6 @@ public class Upgrader
estimatedRows,
repairedAt,
cfs.metadata,
- cfs.partitioner,
sstableMetadataCollector,
SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)),
transaction);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 90a97a0..ae4e966 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -146,7 +146,7 @@ public class Verifier implements Closeable
DecoratedKey key = null;
try
{
- key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
+ key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
}
catch (Throwable th)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index cdacddc..53dad55 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -52,7 +52,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
estimatedTotalKeys,
minRepairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index ad58967..a44ea7e 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -67,7 +67,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
keysPerSSTable,
minRepairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
@@ -96,7 +95,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
averageEstimatedKeysPerSSTable,
minRepairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 9902357..3942b1e 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -56,7 +56,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
@@ -75,7 +74,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 14cb795..5d8670d 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -89,7 +89,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
currentPartitionsToWrite,
minRepairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
@@ -113,7 +112,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
currentPartitionsToWrite,
minRepairedAt,
cfs.metadata,
- cfs.partitioner,
new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 842cbb9..4bb0bc4 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -51,10 +50,9 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
columnDef = columnDefs.iterator().next();
- CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef);
+ CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef, getIndexKeyComparator());
indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
indexedCfMetadata.cfName,
- new LocalPartitioner(getIndexKeyComparator()),
indexedCfMetadata,
baseCfs.getTracker().loadsstables);
}
[3/4] cassandra git commit: Revert "Revert "Stop accessing the
partitioner directly via StorageService""
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 94031ab..9221090 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -42,15 +42,12 @@ import org.apache.cassandra.db.index.keys.KeysIndex;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.LocalByPartionerType;
+import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@ -79,10 +76,6 @@ public abstract class SecondaryIndex
*/
public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
- public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder()
- ? BytesType.instance
- : new LocalByPartionerType(StorageService.getPartitioner());
-
/**
* Base CF that has many indexes
*/
@@ -303,7 +296,7 @@ public abstract class SecondaryIndex
*/
public DecoratedKey getIndexKeyFor(ByteBuffer value)
{
- return getIndexCfs().partitioner.decorateKey(value);
+ return getIndexCfs().decorateKey(value);
}
/**
@@ -381,11 +374,20 @@ public abstract class SecondaryIndex
*/
public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def)
{
+ return newIndexMetadata(baseMetadata, def, def.type);
+ }
+
+ /**
+ * Create the index metadata for the index on a given column of a given table.
+ */
+ static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def, AbstractType<?> comparator)
+ {
if (def.getIndexType() == IndexType.CUSTOM)
return null;
CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
.withId(baseMetadata.cfId)
+ .withPartitioner(new LocalPartitioner(comparator))
.addPartitionKey(def.name, def.type);
if (def.getIndexType() == IndexType.COMPOSITES)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 42861c5..29f235c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -101,7 +101,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
{
- indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+ indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
for (ColumnDefinition def : baseMetadata.clusteringColumns())
indexMetadata.addClusteringColumn(def.name, def.type);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 6529ad9..cd4aff9 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.utils.concurrent.OpOrder;
/**
@@ -48,7 +47,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
{
public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
{
- indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+ indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
for (int i = 0; i < columnDef.position(); i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index d322faf..b76bf7e 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -111,7 +111,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
// *data* for a given partition.
BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator());
List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
- DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey);
+ DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey);
while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 7930bd6..478559a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -23,7 +23,6 @@ import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -42,7 +41,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
{
public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
{
- indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+ indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index bcaf70b..53a9b4a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -84,7 +84,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
while (next == null && indexHits.hasNext())
{
Row hit = indexHits.next();
- DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0));
+ DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0));
SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
baseCfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
deleted file mode 100644
index e02ba3c..0000000
--- a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.marshal;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.serializers.MarshalException;
-
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
- * Not intended for user-defined CFs, and will in fact error out if used with such. */
-public class LocalByPartionerType extends AbstractType<ByteBuffer>
-{
- private final IPartitioner partitioner;
-
- public LocalByPartionerType(IPartitioner partitioner)
- {
- this.partitioner = partitioner;
- }
-
- public static LocalByPartionerType getInstance(TypeParser parser)
- {
- return new LocalByPartionerType(StorageService.getPartitioner());
- }
-
- @Override
- public ByteBuffer compose(ByteBuffer bytes)
- {
- throw new UnsupportedOperationException("You can't do this with a local partitioner.");
- }
-
- @Override
- public ByteBuffer decompose(ByteBuffer bytes)
- {
- throw new UnsupportedOperationException("You can't do this with a local partitioner.");
- }
-
- public String getString(ByteBuffer bytes)
- {
- return ByteBufferUtil.bytesToHex(bytes);
- }
-
- public ByteBuffer fromString(String source)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Term fromJSONObject(Object parsed)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String toJSONString(ByteBuffer buffer, int protocolVersion)
- {
- throw new UnsupportedOperationException();
- }
-
- public int compare(ByteBuffer o1, ByteBuffer o2)
- {
- // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
- return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
- }
-
- @Override
- public void validate(ByteBuffer bytes) throws MarshalException
- {
- throw new IllegalStateException("You shouldn't be validating this.");
- }
-
- public TypeSerializer<ByteBuffer> getSerializer()
- {
- throw new UnsupportedOperationException("You can't do this with a local partitioner.");
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
new file mode 100644
index 0000000..efaea53
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.serializers.MarshalException;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
+ * Not intended for user-defined CFs, and will in fact error out if used with such. */
+public class PartitionerDefinedOrder extends AbstractType<ByteBuffer>
+{
+ private final IPartitioner partitioner;
+
+ public PartitionerDefinedOrder(IPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ }
+
+ @Override
+ public ByteBuffer compose(ByteBuffer bytes)
+ {
+ throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+ }
+
+ @Override
+ public ByteBuffer decompose(ByteBuffer bytes)
+ {
+ throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+ }
+
+ public String getString(ByteBuffer bytes)
+ {
+ return ByteBufferUtil.bytesToHex(bytes);
+ }
+
+ public ByteBuffer fromString(String source)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Term fromJSONObject(Object parsed)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toJSONString(ByteBuffer buffer, int protocolVersion)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int compare(ByteBuffer o1, ByteBuffer o2)
+ {
+ // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
+ return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
+ }
+
+ @Override
+ public void validate(ByteBuffer bytes) throws MarshalException
+ {
+ throw new IllegalStateException("You shouldn't be validating this.");
+ }
+
+ public TypeSerializer<ByteBuffer> getSerializer()
+ {
+ throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index e8ec4c0..25d1887 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.ColumnFilter;
@@ -42,7 +43,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Locks;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.HeapAllocator;
-import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
@@ -59,7 +59,7 @@ public class AtomicBTreePartition implements Partition
private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class);
public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
- StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
+ DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
null));
// Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 102008f..f2e0617 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -102,6 +102,17 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true);
}
+ public PartitionUpdate(CFMetaData metadata,
+ ByteBuffer key,
+ PartitionColumns columns,
+ int initialRowCapacity)
+ {
+ this(metadata,
+ metadata.decorateKey(key),
+ columns,
+ initialRowCapacity);
+ }
+
/**
* Creates a empty immutable partition update.
*
@@ -134,7 +145,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
* Creates an immutable partition update that contains a single row update.
*
* @param metadata the metadata for the created update.
- * @param key the partition key for the partition that the created update should delete.
+ * @param key the partition key for the partition to update.
* @param row the row for the update.
*
* @return the newly created partition update containing only {@code row}.
@@ -147,6 +158,20 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
}
/**
+ * Creates an immutable partition update that contains a single row update.
+ *
+ * @param metadata the metadata for the created update.
+ * @param key the partition key for the partition to update.
+ * @param row the row for the update.
+ *
+ * @return the newly created partition update containing only {@code row}.
+ */
+ public static PartitionUpdate singleRowUpdate(CFMetaData metadata, ByteBuffer key, Row row)
+ {
+ return singleRowUpdate(metadata, metadata.decorateKey(key), row);
+ }
+
+ /**
* Turns the given iterator into an update.
*
* Warning: this method does not close the provided iterator, it is up to
@@ -262,6 +287,21 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
}
/**
+ * Creates a partition update that entirely deletes a given partition.
+ *
+ * @param metadata the metadata for the created update.
+ * @param key the partition key for the partition that the created update should delete.
+ * @param timestamp the timestamp for the deletion.
+ * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion.
+ *
+ * @return the newly created partition deletion update.
+ */
+ public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, ByteBuffer key, long timestamp, int nowInSec)
+ {
+ return fullPartitionDelete(metadata, metadata.decorateKey(key), timestamp, nowInSec);
+ }
+
+ /**
* Merges the provided updates, yielding a new update that incorporates all those updates.
*
* @param updates the collection of updates to merge. This shouldn't be empty.
@@ -695,29 +735,39 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
}
}
- public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+ public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
{
- if (version < MessagingService.VERSION_30)
+ if (version >= MessagingService.VERSION_30)
+ {
+ assert key == null; // key is only there for the old format
+ return deserialize30(in, version, flag);
+ }
+ else
{
assert key != null;
-
- // This is only used in mutation, and mutation have never allowed "null" column families
- boolean present = in.readBoolean();
- assert present;
-
- CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
- LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
- int size = in.readInt();
- Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
- SerializationHelper helper = new SerializationHelper(metadata, version, flag);
- try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper))
- {
- return PartitionUpdate.fromIterator(iterator);
- }
+ CFMetaData metadata = deserializeMetadata(in, version);
+ DecoratedKey dk = metadata.decorateKey(key);
+ return deserializePre30(in, version, flag, metadata, dk);
}
+ }
- assert key == null; // key is only there for the old format
+ // Used to share same decorated key between updates.
+ public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+ {
+ if (version >= MessagingService.VERSION_30)
+ {
+ return deserialize30(in, version, flag);
+ }
+ else
+ {
+ assert key != null;
+ CFMetaData metadata = deserializeMetadata(in, version);
+ return deserializePre30(in, version, flag, metadata, key);
+ }
+ }
+ private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+ {
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
if (header.isEmpty)
@@ -752,6 +802,28 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
false);
}
+ private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException
+ {
+ // This is only used in mutation, and mutation have never allowed "null" column families
+ boolean present = in.readBoolean();
+ assert present;
+
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ return metadata;
+ }
+
+ private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException
+ {
+ LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
+ int size = in.readInt();
+ Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
+ SerializationHelper helper = new SerializationHelper(metadata, version, flag);
+ try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper))
+ {
+ return PartitionUpdate.fromIterator(iterator);
+ }
+ }
+
public long serializedSize(PartitionUpdate update, int version)
{
if (version < MessagingService.VERSION_30)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index b96e0b1..531bd26 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -170,7 +169,7 @@ public class UnfilteredRowIteratorSerializer
public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
{
- DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithVIntLength(in));
+ DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
int flags = in.readUnsignedByte();
boolean isReversed = (flags & IS_REVERSED) != 0;
if ((flags & IS_EMPTY) != 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 082c71d..f36abae 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -303,9 +303,10 @@ public class MaterializedView
partitionKey[i] = value;
}
- return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
- .getKeyValidatorAsClusteringComparator()
- .make(partitionKey)));
+ CFMetaData metadata = getViewCfs().metadata;
+ return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
+ .getKeyValidatorAsClusteringComparator()
+ .make(partitionKey)));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 53e4e91..d0ba5ea 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -377,7 +377,7 @@ public class TemporalRow
this.baseCfs = baseCfs;
this.viewPrimaryKey = viewPrimaryKey;
this.key = key;
- this.dk = baseCfs.partitioner.decorateKey(key);
+ this.dk = baseCfs.decorateKey(key);
this.clusteringToRow = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 2cb7f61..31fda34 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -164,7 +164,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
// if user specified tokens, use those
if (initialTokens.size() > 0)
return getSpecifiedTokens(metadata, initialTokens);
-
+
int numTokens = DatabaseDescriptor.getNumTokens();
if (numTokens < 1)
throw new ConfigurationException("num_tokens must be >= 1");
@@ -179,13 +179,13 @@ public class BootStrapper extends ProgressEventNotifierSupport
}
private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata,
- Collection<String> initialTokens)
+ Collection<String> initialTokens)
{
logger.debug("tokens manually specified as {}", initialTokens);
List<Token> tokens = new ArrayList<>(initialTokens.size());
for (String tokenString : initialTokens)
{
- Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
+ Token token = metadata.partitioner.getTokenFactory().fromString(tokenString);
if (metadata.getEndpoint(token) != null)
throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first).");
tokens.add(token);
@@ -202,8 +202,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
if (ks == null)
throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace);
AbstractReplicationStrategy rs = ks.getReplicationStrategy();
-
- return TokenAllocation.allocateTokens(metadata, rs, StorageService.getPartitioner(), address, numTokens);
+
+ return TokenAllocation.allocateTokens(metadata, rs, address, numTokens);
}
public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
@@ -211,7 +211,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
Set<Token> tokens = new HashSet<>(numTokens);
while (tokens.size() < numTokens)
{
- Token token = StorageService.getPartitioner().getRandomToken();
+ Token token = metadata.partitioner.getRandomToken();
if (metadata.getEndpoint(token) == null)
tokens.add(token);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index d7139d0..46872c1 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -300,4 +300,9 @@ public class ByteOrderedPartitioner implements IPartitioner
{
return BytesType.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return BytesType.instance;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index b22da66..e0a08dc 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -78,4 +78,10 @@ public interface IPartitioner
public Map<Token, Float> describeOwnership(List<Token> sortedTokens);
public AbstractType<?> getTokenValidator();
+
+ /**
+ * Abstract type that orders the same way as DecoratedKeys provided by this partitioner.
+ * Used by secondary indices.
+ */
+ public AbstractType<?> partitionOrdering();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 01dc75e..2a5a16e 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -84,6 +84,11 @@ public class LocalPartitioner implements IPartitioner
return comparator;
}
+ public AbstractType<?> partitionOrdering()
+ {
+ return comparator;
+ }
+
public class LocalToken extends ComparableObjectToken<ByteBuffer>
{
static final long serialVersionUID = 8437543776403014875L;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 003879c..d68be3f 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PreHashedDecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,6 +46,7 @@ public class Murmur3Partitioner implements IPartitioner
private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(MINIMUM);
public static final Murmur3Partitioner instance = new Murmur3Partitioner();
+ public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
public DecoratedKey decorateKey(ByteBuffer key)
{
@@ -288,4 +290,9 @@ public class Murmur3Partitioner implements IPartitioner
{
return LongType.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return partitionOrdering;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index ae0326f..45c2cfa 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -240,4 +240,9 @@ public class OrderPreservingPartitioner implements IPartitioner
{
return UTF8Type.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return UTF8Type.instance;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 71a0a99..b0dea01 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.GuidGenerator;
@@ -47,6 +48,7 @@ public class RandomPartitioner implements IPartitioner
private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(FBUtilities.hashToBigInteger(ByteBuffer.allocate(1))));
public static final RandomPartitioner instance = new RandomPartitioner();
+ public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
public DecoratedKey decorateKey(ByteBuffer key)
{
@@ -196,4 +198,9 @@ public class RandomPartitioner implements IPartitioner
{
return IntegerType.instance;
}
+
+ public AbstractType<?> partitionOrdering()
+ {
+ return partitionOrdering;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 68c8a11..e7624c3 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -333,7 +333,7 @@ public class RangeStreamer
Collection<Range<Token>> ranges = entry.getValue().getValue();
// filter out already streamed ranges
- Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner());
+ Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner);
if (ranges.removeAll(availableRanges))
{
logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index dd3a02b..b4281ce 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -33,7 +33,6 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -49,12 +48,11 @@ public class TokenAllocation
public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
final AbstractReplicationStrategy rs,
- final IPartitioner partitioner,
final InetAddress endpoint,
int numTokens)
{
StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint);
- Collection<Token> tokens = create(tokenMetadata, strategy, partitioner).addUnit(endpoint, numTokens);
+ Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens);
if (logger.isWarnEnabled())
@@ -141,7 +139,7 @@ public class TokenAllocation
return stat;
}
- static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy, IPartitioner partitioner)
+ static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy)
{
NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>();
for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
@@ -149,7 +147,7 @@ public class TokenAllocation
if (strategy.inAllocationRing(en.getValue()))
sortedTokens.put(en.getKey(), en.getValue());
}
- return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner);
+ return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, tokenMetadata.partitioner);
}
interface StrategyAdapter extends ReplicationStrategy<InetAddress>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e61a35a..5fa402a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -584,7 +584,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
JVMStabilityInspector.inspectThrowable(th);
// TODO this is broken
logger.warn("Unable to calculate tokens for {}. Will use a random one", address);
- tokens = Collections.singletonList(StorageService.getPartitioner().getRandomToken());
+ tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken());
}
int generation = epState.getHeartBeatState().getGeneration();
int heartbeat = epState.getHeartBeatState().getHeartBeatVersion();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index acc9141..f4b4da8 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.Closeable;
+import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,7 +31,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.Pair;
@@ -46,12 +46,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat();
protected static AtomicInteger generation = new AtomicInteger(0);
- protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
+ protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
{
this.metadata = metadata;
this.directory = directory;
this.columns = columns;
- DatabaseDescriptor.setPartitioner(partitioner);
}
protected void setSSTableFormatType(SSTableFormat.Type type)
@@ -103,6 +102,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
return maxGen;
}
+ PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
+ {
+ return getUpdateFor(metadata.decorateKey(key));
+ }
+
/**
* Returns a PartitionUpdate suitable to write on this writer for the provided key.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 43e214b..7ae5651 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -77,6 +77,9 @@ public class CQLSSTableWriter implements Closeable
static
{
Config.setClientMode(true);
+ // Partitioner is not set in client mode.
+ if (DatabaseDescriptor.getPartitioner() == null)
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
}
private final AbstractSSTableSimpleWriter writer;
@@ -219,10 +222,7 @@ public class CQLSSTableWriter implements Closeable
try
{
for (ByteBuffer key : keys)
- {
- DecoratedKey dk = DatabaseDescriptor.getPartitioner().decorateKey(key);
- insert.addUpdateForKey(writer.getUpdateFor(dk), clustering, params);
- }
+ insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
return this;
}
catch (SSTableSimpleUnsortedWriter.SyncException e)
@@ -277,7 +277,6 @@ public class CQLSSTableWriter implements Closeable
public static class Builder
{
private File directory;
- private IPartitioner partitioner = Murmur3Partitioner.instance;
protected SSTableFormat.Type formatType = null;
@@ -402,7 +401,7 @@ public class CQLSSTableWriter implements Closeable
*/
public Builder withPartitioner(IPartitioner partitioner)
{
- this.partitioner = partitioner;
+ this.schema = schema.copy(partitioner);
return this;
}
@@ -511,8 +510,8 @@ public class CQLSSTableWriter implements Closeable
throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
AbstractSSTableSimpleWriter writer = sorted
- ? new SSTableSimpleWriter(directory, schema, partitioner, insert.updatedColumns())
- : new SSTableSimpleUnsortedWriter(directory, schema, partitioner, insert.updatedColumns(), bufferSizeInMB);
+ ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns())
+ : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB);
if (formatType != null)
writer.setSSTableFormatType(formatType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 5de2452..8fb300b 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -80,11 +82,13 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
}
private final In in;
+ private final IPartitioner partitioner;
- public KeyIterator(Descriptor desc)
+ public KeyIterator(Descriptor desc, CFMetaData metadata)
{
in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
+ partitioner = metadata.partitioner;
}
protected DecoratedKey computeNext()
@@ -94,7 +98,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
if (in.isEOF())
return endOfData();
- DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
+ DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the entry
return key;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index f1e01f2..bbc56cc 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -40,7 +40,7 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
{
iters = new ArrayList<>(sstables.size());
for (SSTableReader sstable : sstables)
- iters.add(new KeyIterator(sstable.descriptor));
+ iters.add(new KeyIterator(sstable.descriptor, sstable.metadata));
}
private void maybeInit()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 516534d..b86d9b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
+import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -63,31 +64,29 @@ public abstract class SSTable
public final Descriptor descriptor;
protected final Set<Component> components;
public final CFMetaData metadata;
- public final IPartitioner partitioner;
public final boolean compression;
public DecoratedKey first;
public DecoratedKey last;
- protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
+ protected SSTable(Descriptor descriptor, CFMetaData metadata)
{
- this(descriptor, new HashSet<>(), metadata, partitioner);
+ this(descriptor, new HashSet<>(), metadata);
}
- protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
+ protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata)
{
// In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
// full schema definition. SSTableLoader use that ability
assert descriptor != null;
assert components != null;
- assert partitioner != null;
+ assert metadata != null;
this.descriptor = descriptor;
Set<Component> dataComponents = new HashSet<>(components);
this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
this.components = new CopyOnWriteArraySet<>(dataComponents);
this.metadata = metadata;
- this.partitioner = partitioner;
}
/**
@@ -121,6 +120,16 @@ public abstract class SSTable
return true;
}
+ public IPartitioner getPartitioner()
+ {
+ return metadata.partitioner;
+ }
+
+ public DecoratedKey decorateKey(ByteBuffer key)
+ {
+ return getPartitioner().decorateKey(key);
+ }
+
/**
* If the given @param key occupies only part of a larger buffer, allocate a new buffer that is only
* as large as necessary.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f25d3ff..20c3962 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -133,7 +133,7 @@ public class SSTableLoader implements StreamEventHandler
// To conserve memory, open SSTableReaders without bloom filters and discard
// the index summary after calculating the file sections to stream and the estimated
// number of keys for each endpoint. See CASSANDRA-5555 for details.
- SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata, client.getPartitioner());
+ SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata);
sstables.add(sstable);
// calculate the sstable sections to stream as well as the estimated number of
@@ -252,7 +252,6 @@ public class SSTableLoader implements StreamEventHandler
public static abstract class Client
{
private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
- private IPartitioner partitioner;
/**
* Initialize the client.
@@ -299,23 +298,6 @@ public class SSTableLoader implements StreamEventHandler
return endpointToRanges;
}
- protected void setPartitioner(String partclass) throws ConfigurationException
- {
- setPartitioner(FBUtilities.newPartitioner(partclass));
- }
-
- protected void setPartitioner(IPartitioner partitioner)
- {
- this.partitioner = partitioner;
- // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner
- DatabaseDescriptor.setPartitioner(partitioner);
- }
-
- public IPartitioner getPartitioner()
- {
- return partitioner;
- }
-
protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint)
{
Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index a70b92f..f4b9adf 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -60,9 +59,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
private final DiskWriter diskWriter = new DiskWriter();
- SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
+ SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
{
- super(directory, metadata, partitioner, columns);
+ super(directory, metadata, columns);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS);
diskWriter.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index b22a048..45722cd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -44,9 +44,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
+ protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
{
- super(directory, metadata, partitioner, columns);
+ super(directory, metadata, columns);
}
private SSTableTxnWriter getOrCreateWriter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 6d39d2d..5ceced5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -27,10 +27,11 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
@@ -42,27 +43,23 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
import org.apache.cassandra.io.sstable.metadata.*;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
@@ -351,21 +348,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
{
- IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
- ? new LocalPartitioner(metadata.getKeyValidator())
- : StorageService.getPartitioner();
- return open(desc, componentsFor(desc), metadata, p);
+ return open(desc, componentsFor(desc), metadata);
}
- public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
{
- return open(descriptor, components, metadata, partitioner, true, true);
+ return open(descriptor, components, metadata, true, true);
}
// use only for offline or "Standalone" operations
public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs) throws IOException
{
- return open(descriptor, components, cfs.metadata, cfs.partitioner, false, false); // do not track hotness
+ return open(descriptor, components, cfs.metadata, false, false); // do not track hotness
}
/**
@@ -374,11 +368,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @param descriptor
* @param components
* @param metadata
- * @param partitioner
* @return opened SSTableReader
* @throws IOException
*/
- public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
{
// Minimum components without which we can't do anything
assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
@@ -394,7 +387,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// Check if sstable is created using same partitioner.
// Partitioner can be null, which indicates older version of sstable or no stats available.
// In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
+ String partitionerName = metadata.partitioner.getClass().getCanonicalName();
if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
{
logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -406,7 +399,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader sstable = internalOpen(descriptor,
components,
metadata,
- partitioner,
System.currentTimeMillis(),
statsMetadata,
OpenReason.NORMAL,
@@ -431,7 +423,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static SSTableReader open(Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
boolean validate,
boolean trackHotness) throws IOException
{
@@ -452,7 +443,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// Check if sstable is created using same partitioner.
// Partitioner can be null, which indicates older version of sstable or no stats available.
// In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
+ String partitionerName = metadata.partitioner.getClass().getCanonicalName();
if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
{
logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
@@ -464,7 +455,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader sstable = internalOpen(descriptor,
components,
metadata,
- partitioner,
System.currentTimeMillis(),
statsMetadata,
OpenReason.NORMAL,
@@ -502,8 +492,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
- final CFMetaData metadata,
- final IPartitioner partitioner)
+ final CFMetaData metadata)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
@@ -517,7 +506,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader sstable;
try
{
- sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
+ sstable = open(entry.getKey(), entry.getValue(), metadata);
}
catch (CorruptSSTableException ex)
{
@@ -562,7 +551,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static SSTableReader internalOpen(Descriptor desc,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
SegmentedFile ifile,
SegmentedFile dfile,
IndexSummary isummary,
@@ -572,9 +560,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
OpenReason openReason,
SerializationHeader header)
{
- assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+ assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
- SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
reader.bf = bf;
reader.ifile = ifile;
@@ -589,7 +577,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
private static SSTableReader internalOpen(final Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
Long maxDataAge,
StatsMetadata sstableMetadata,
OpenReason openReason,
@@ -597,19 +584,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
Factory readerFactory = descriptor.getFormat().getReaderFactory();
- return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
protected SSTableReader(final Descriptor desc,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
long maxDataAge,
StatsMetadata sstableMetadata,
OpenReason openReason,
SerializationHeader header)
{
- super(desc, components, metadata, partitioner);
+ super(desc, components, metadata);
this.sstableMetadata = sstableMetadata;
this.header = header;
this.maxDataAge = maxDataAge;
@@ -814,7 +800,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex);
- DecoratedKey decoratedKey = partitioner.decorateKey(key);
+ DecoratedKey decoratedKey = decorateKey(key);
if (first == null)
first = decoratedKey;
last = decoratedKey;
@@ -832,7 +818,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
if (!summaryLoaded)
- indexSummary = summaryBuilder.build(partitioner);
+ indexSummary = summaryBuilder.build(getPartitioner());
}
}
@@ -862,10 +848,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
iStream = new DataInputStream(new FileInputStream(summariesFile));
indexSummary = IndexSummary.serializer.deserialize(
- iStream, partitioner, descriptor.version.hasSamplingLevel(),
+ iStream, getPartitioner(), descriptor.version.hasSamplingLevel(),
metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
- first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
- last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ first = decorateKey(ByteBufferUtil.readWithLength(iStream));
+ last = decorateKey(ByteBufferUtil.readWithLength(iStream));
ibuilder.deserializeBounds(iStream);
dbuilder.deserializeBounds(iStream);
}
@@ -1064,7 +1050,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
SSTableReader replacement = internalOpen(descriptor,
components,
metadata,
- partitioner,
ifile != null ? ifile.sharedCopy() : null,
dfile.sharedCopy(),
newSummary,
@@ -1168,7 +1153,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
else if (samplingLevel < indexSummary.getSamplingLevel())
{
// we can use the existing index summary to make a smaller one
- newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
@@ -1203,11 +1188,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
long indexPosition;
while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
- summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+ summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
RowIndexEntry.Serializer.skip(primaryIndex);
}
- return summaryBuilder.build(partitioner);
+ return summaryBuilder.build(getPartitioner());
}
}
finally
@@ -1304,8 +1289,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
- //We need the parent cf metadata
- String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
+ // We need the parent cf metadata
+ String cfName = metadata.isIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
return cmd;
@@ -1477,7 +1462,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public DecoratedKey next()
{
byte[] bytes = indexSummary.getKey(idx++);
- return partitioner.decorateKey(ByteBuffer.wrap(bytes));
+ return decorateKey(ByteBuffer.wrap(bytes));
}
public void remove()
@@ -1619,7 +1604,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
while (!in.isEOF())
{
ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
- DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ DecoratedKey indexDecoratedKey = decorateKey(indexKey);
if (indexDecoratedKey.compareTo(token) > 0)
return indexDecoratedKey;
@@ -2300,7 +2285,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public abstract SSTableReader open(final Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
- IPartitioner partitioner,
Long maxDataAge,
StatsMetadata sstableMetadata,
OpenReason openReason,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 08a9dcc..fa691b8 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -27,13 +27,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
@@ -74,11 +72,10 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header)
{
- super(descriptor, components(metadata), metadata, partitioner);
+ super(descriptor, components(metadata), metadata);
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
@@ -90,19 +87,18 @@ public abstract class SSTableWriter extends SSTable implements Transactional
Long keyCount,
Long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
+ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
}
public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header, txn);
+ return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn);
}
public static SSTableWriter create(CFMetaData metadata,
@@ -110,12 +106,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
int sstableLevel,
- IPartitioner partitioner,
SerializationHeader header,
LifecycleTransaction txn)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header, txn);
+ return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
}
public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn)
@@ -255,7 +250,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected Map<MetadataType, MetadataComponent> finalizeMetadata()
{
- return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
metadata.getBloomFilterFpChance(),
repairedAt,
header);
@@ -287,7 +282,6 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index a072d4d..860cd9f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -23,7 +23,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -86,21 +85,20 @@ public class BigFormat implements SSTableFormat
long keyCount,
long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn);
+ return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
}
}
static class ReaderFactory extends SSTableReader.Factory
{
@Override
- public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
+ public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
{
- return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index b539c79..87608fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -52,9 +52,9 @@ public class BigTableReader extends SSTableReader
{
private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
- BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
+ BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
{
- super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
+ super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
@@ -201,7 +201,7 @@ public class BigTableReader extends SSTableReader
}
else
{
- DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ DecoratedKey indexDecoratedKey = decorateKey(indexKey);
int comparison = indexDecoratedKey.compareTo(key);
int v = op.apply(comparison);
opSatisfied = (v == 0);
@@ -227,7 +227,7 @@ public class BigTableReader extends SSTableReader
// expensive sanity check! see CASSANDRA-4687
try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
{
- DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+ DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
if (!keyInDisk.equals(key))
throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
}
[2/4] cassandra git commit: Revert "Revert "Stop accessing the
partitioner directly via StorageService""
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 0451a98..1b3b407 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -172,7 +172,7 @@ public class BigTableScanner implements ISSTableScanner
while (!ifile.isEOF())
{
indexPosition = ifile.getFilePointer();
- DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ DecoratedKey indexDecoratedKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
if (indexDecoratedKey.compareTo(currentRange.left) > 0 || currentRange.contains(indexDecoratedKey))
{
// Found, just read the dataPosition and seek into index and data files
@@ -282,7 +282,7 @@ public class BigTableScanner implements ISSTableScanner
if (ifile.isEOF())
return endOfData();
- currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ currentKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
currentEntry = rowIndexEntrySerializer.deserialize(ifile);
} while (!currentRange.contains(currentKey));
}
@@ -301,7 +301,7 @@ public class BigTableScanner implements ISSTableScanner
else
{
// we need the position of the start of the next key, regardless of whether it falls in the current range
- nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ nextKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
nextEntry = rowIndexEntrySerializer.deserialize(ifile);
if (!currentRange.contains(nextKey))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index ff279a8..5607a7e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable.format.big;
import java.io.*;
import java.util.Map;
-import java.util.Set;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -32,7 +31,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -62,12 +60,11 @@ public class BigTableWriter extends SSTableWriter
Long keyCount,
Long repairedAt,
CFMetaData metadata,
- IPartitioner partitioner,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn)
{
- super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header);
+ super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header);
txn.trackNew(this); // must track before any files are created
if (compression)
@@ -243,13 +240,12 @@ public class BigTableWriter extends SSTableWriter
StatsMetadata stats = statsMetadata();
assert boundary.indexLength > 0 && boundary.dataLength > 0;
// open the reader early
- IndexSummary indexSummary = iwriter.summary.build(partitioner, boundary);
+ IndexSummary indexSummary = iwriter.summary.build(metadata.partitioner, boundary);
SegmentedFile ifile = iwriter.builder.buildIndex(descriptor, indexSummary, boundary);
SegmentedFile dfile = dbuilder.buildData(descriptor, stats, boundary);
SSTableReader sstable = SSTableReader.internalOpen(descriptor,
components, metadata,
- partitioner, ifile,
- dfile, indexSummary,
+ ifile, dfile, indexSummary,
iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header);
// now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
@@ -275,13 +271,12 @@ public class BigTableWriter extends SSTableWriter
StatsMetadata stats = statsMetadata();
// finalize in-memory state for the reader
- IndexSummary indexSummary = iwriter.summary.build(partitioner);
+ IndexSummary indexSummary = iwriter.summary.build(this.metadata.partitioner);
SegmentedFile ifile = iwriter.builder.buildIndex(desc, indexSummary);
SegmentedFile dfile = dbuilder.buildData(desc, stats);
SSTableReader sstable = SSTableReader.internalOpen(desc,
components,
this.metadata,
- partitioner,
ifile,
dfile,
indexSummary,
@@ -475,7 +470,7 @@ public class BigTableWriter extends SSTableWriter
// save summary
summary.prepareToCommit();
- try (IndexSummary summary = iwriter.summary.build(partitioner))
+ try (IndexSummary summary = iwriter.summary.build(getPartitioner()))
{
SSTableReader.saveSummary(descriptor, first, last, iwriter.builder, dbuilder, summary);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 1b3c560..c3ec353 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -26,12 +26,14 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
@@ -92,6 +94,7 @@ public class TokenMetadata
private volatile ArrayList<Token> sortedTokens;
private final Topology topology;
+ public final IPartitioner partitioner;
private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
{
@@ -108,17 +111,28 @@ public class TokenMetadata
{
this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
HashBiMap.<InetAddress, UUID>create(),
- new Topology());
+ new Topology(),
+ DatabaseDescriptor.getPartitioner());
}
- private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology)
+ private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
{
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
+ this.partitioner = partitioner;
endpointToHostIdMap = endpointsMap;
sortedTokens = sortTokens();
}
+ /**
+ * To be used by tests only (via {@link StorageService.setPartitionerUnsafe}).
+ */
+ @VisibleForTesting
+ public TokenMetadata cloneWithNewPartitioner(IPartitioner newPartitioner)
+ {
+ return new TokenMetadata(tokenToEndpointMap, endpointToHostIdMap, topology, newPartitioner);
+ }
+
private ArrayList<Token> sortTokens()
{
return new ArrayList<Token>(tokenToEndpointMap.keySet());
@@ -521,7 +535,8 @@ public class TokenMetadata
{
return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
HashBiMap.create(endpointToHostIdMap),
- new Topology(topology));
+ new Topology(topology),
+ partitioner);
}
finally
{
@@ -880,7 +895,7 @@ public class TokenMetadata
public static Iterator<Token> ringIterator(final ArrayList<Token> ring, Token start, boolean includeMin)
{
if (ring.isEmpty())
- return includeMin ? Iterators.singletonIterator(StorageService.getPartitioner().getMinimumToken())
+ return includeMin ? Iterators.singletonIterator(start.getPartitioner().getMinimumToken())
: Iterators.<Token>emptyIterator();
final boolean insertMin = includeMin && !ring.get(0).isMinimum();
@@ -896,7 +911,7 @@ public class TokenMetadata
{
// return minimum for index == -1
if (j == -1)
- return StorageService.getPartitioner().getMinimumToken();
+ return start.getPartitioner().getMinimumToken();
// return ring token for other indexes
return ring.get(j);
}
@@ -1093,6 +1108,11 @@ public class TokenMetadata
cachedTokenMap.set(null);
}
+ public DecoratedKey decorateKey(ByteBuffer key)
+ {
+ return partitioner.decorateKey(key);
+ }
+
/**
* Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
* in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4f15da2..422fdb3 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1108,12 +1108,14 @@ public final class MessagingService implements MessagingServiceMBean
public static IPartitioner globalPartitioner()
{
- return DatabaseDescriptor.getPartitioner();
+ return StorageService.instance.getTokenMetadata().partitioner;
}
public static void validatePartitioner(AbstractBounds<?> bounds)
{
if (globalPartitioner() != bounds.left.getPartitioner())
- throw new AssertionError();
+ throw new AssertionError(String.format("Partitioner in bounds serialization. Expected %s, was %s.",
+ globalPartitioner().getClass().getName(),
+ bounds.left.getPartitioner().getClass().getName()));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c0855c4..d765ae6 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -85,8 +85,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
public boolean apply(SSTableReader sstable)
{
return sstable != null &&
- !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
- new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+ !sstable.metadata.isIndex() && // exclude SSTables from 2i
+ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
}
}, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 1c21e41..41da481 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -307,9 +306,16 @@ public final class LegacySchemaMigrator
defaultValidator);
}
- // The legacy schema did not have views, so we know that we are not loading a materialized view
- boolean isMaterializedView = false;
- CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, columnDefs);
+ CFMetaData cfm = CFMetaData.create(ksName,
+ cfName,
+ cfId,
+ isDense,
+ isCompound,
+ isSuper,
+ isCounter,
+ false, // legacy schema did not contain views
+ columnDefs,
+ DatabaseDescriptor.getPartitioner());
cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));
@@ -579,7 +585,7 @@ public final class LegacySchemaMigrator
ClusteringComparator comparator = store.metadata.comparator;
Slices slices = Slices.with(comparator, Slice.make(comparator, typeName));
int nowInSec = FBUtilities.nowInSeconds();
- DecoratedKey key = StorageService.getPartitioner().decorateKey(AsciiType.instance.fromString(keyspaceName));
+ DecoratedKey key = store.metadata.decorateKey(AsciiType.instance.fromString(keyspaceName));
SinglePartitionReadCommand command = SinglePartitionSliceCommand.create(store.metadata, nowInSec, key, slices);
try (OpOrder.Group op = store.readOrdering.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 1f1a3dd..b33ba76 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.functions.*;
import org.apache.cassandra.cql3.statements.CFPropDefs;
-import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.marshal.*;
@@ -47,7 +46,6 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -397,19 +395,24 @@ public final class SchemaKeyspace
return AsciiType.instance.fromString(ksName);
}
- private static DecoratedKey getSchemaKSDecoratedKey(String ksName)
+ private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct)
{
- return StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
+ return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSKey(keyspaceName), fct);
}
- private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct)
+ private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, ByteBuffer keyspaceKey, Function<RowIterator, T> fct)
{
- return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSDecoratedKey(keyspaceName), fct);
+ ColumnFamilyStore store = getSchemaCFS(schemaTableName);
+ return readSchemaPartitionForKeyspaceAndApply(store, store.decorateKey(keyspaceKey), fct);
}
private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
{
- ColumnFamilyStore store = getSchemaCFS(schemaTableName);
+ return readSchemaPartitionForKeyspaceAndApply(getSchemaCFS(schemaTableName), keyspaceKey, fct);
+ }
+
+ private static <T> T readSchemaPartitionForKeyspaceAndApply(ColumnFamilyStore store, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
+ {
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group op = store.readOrdering.start();
RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey)
@@ -427,7 +430,7 @@ public final class SchemaKeyspace
Slices slices = Slices.with(comparator, Slice.make(comparator, tableName));
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group op = store.readOrdering.start();
- RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionSliceCommand.create(store.metadata, nowInSec, getSchemaKSDecoratedKey(keyspaceName), slices)
+ RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionSliceCommand.create(store.metadata, nowInSec, getSchemaKSKey(keyspaceName), slices)
.queryMemtableAndDisk(store, op), nowInSec))
{
return fct.apply(partition);
@@ -698,7 +701,8 @@ public final class SchemaKeyspace
public static Mutation makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp)
{
int nowInSec = FBUtilities.nowInSeconds();
- Mutation mutation = new Mutation(NAME, getSchemaKSDecoratedKey(keyspace.name));
+ Mutation mutation = new Mutation(NAME, Keyspaces.decorateKey(getSchemaKSKey(keyspace.name)));
+
for (CFMetaData schemaTable : All)
mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec));
@@ -1085,7 +1089,16 @@ public final class SchemaKeyspace
boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW);
- CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, columns);
+ CFMetaData cfm = CFMetaData.create(keyspace,
+ table,
+ id,
+ isDense,
+ isCompound,
+ isSuper,
+ isCounter,
+ isMaterializedView,
+ columns,
+ DatabaseDescriptor.getPartitioner());
Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction"));
Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index b1554e3..253e9e4 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -372,7 +372,7 @@ public class CacheService implements CacheServiceMBean
{
public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
{
- DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
+ DecoratedKey key = cfs.decorateKey(partitionKey);
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName);
ColumnDefinition column = name.column;
CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
@@ -427,7 +427,7 @@ public class CacheService implements CacheServiceMBean
{
public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
{
- DecoratedKey key = cfs.partitioner.decorateKey(buffer);
+ DecoratedKey key = cfs.decorateKey(buffer);
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index be11c77..51aa48f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -643,7 +643,7 @@ public class StorageProxy implements StorageProxyMBean
try
{
- Token baseToken = StorageService.getPartitioner().getToken(dataKey);
+ Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
@@ -862,8 +862,11 @@ public class StorageProxy implements StorageProxyMBean
Keyspace.open(SystemKeyspace.NAME),
null,
WriteType.SIMPLE);
- Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid)));
- mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+ Mutation mutation = new Mutation(
+ PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog,
+ UUIDType.instance.decompose(uuid),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()));
MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
for (InetAddress target : endpoints)
{
@@ -1686,7 +1689,7 @@ public class StorageProxy implements StorageProxyMBean
public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, ByteBuffer key)
{
- return getLiveSortedEndpoints(keyspace, StorageService.getPartitioner().decorateKey(key));
+ return getLiveSortedEndpoints(keyspace, StorageService.instance.getTokenMetadata().decorateKey(key));
}
public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5049337..b51dd3a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -78,6 +78,7 @@ import org.apache.cassandra.dht.RangeStreamer;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.StreamStateStore;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.Token.TokenFactory;
import org.apache.cassandra.exceptions.AlreadyExistsException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -188,7 +189,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata = new TokenMetadata();
- public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner());
+ public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(tokenMetadata.partitioner);
private Thread drainOnShutdown = null;
private boolean inShutdownHook = false;
@@ -200,11 +201,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return inShutdownHook;
}
- public static IPartitioner getPartitioner()
- {
- return DatabaseDescriptor.getPartitioner();
- }
-
public Collection<Range<Token>> getLocalRanges(String keyspaceName)
{
return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
@@ -510,7 +506,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
- Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
+ Collection<Token> tokens = TokenSerializer.deserialize(
+ tokenMetadata.partitioner,
+ new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(),
+ ApplicationState.TOKENS))));
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
@@ -947,7 +946,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
bootstrapTokens = new ArrayList<>(initialTokens.size());
for (String token : initialTokens)
- bootstrapTokens.add(getPartitioner().getTokenFactory().fromString(token));
+ bootstrapTokens.add(getTokenFactory().fromString(token));
logger.info("Saved tokens not found. Using configuration value: {}", bootstrapTokens);
}
}
@@ -1509,7 +1508,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace);
List<TokenRange> ranges = new ArrayList<>();
- Token.TokenFactory tf = getPartitioner().getTokenFactory();
+ Token.TokenFactory tf = getTokenFactory();
Map<Range<Token>, List<InetAddress>> rangeToAddressMap =
includeOnlyLocalDC
@@ -1818,7 +1817,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
try
{
- return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS))));
+ return TokenSerializer.deserialize(
+ tokenMetadata.partitioner,
+ new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS))));
}
catch (IOException e)
{
@@ -2049,7 +2050,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void handleStateMoving(InetAddress endpoint, String[] pieces)
{
assert pieces.length >= 2;
- Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
+ Token token = getTokenFactory().fromString(pieces[1]);
if (logger.isDebugEnabled())
logger.debug("Node {} state moving, new token {}", endpoint, token);
@@ -2790,7 +2791,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int repairAsync(String keyspace, Map<String, String> repairSpec)
{
- RepairOption option = RepairOption.parse(repairSpec, getPartitioner());
+ RepairOption option = RepairOption.parse(repairSpec, tokenMetadata.partitioner);
// if ranges are not specified
if (option.getRanges().isEmpty())
{
@@ -2974,8 +2975,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
@VisibleForTesting
Collection<Range<Token>> createRepairRangeFrom(String beginToken, String endToken)
{
- Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
- Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+ Token parsedBeginToken = getTokenFactory().fromString(beginToken);
+ Token parsedEndToken = getTokenFactory().fromString(endToken);
// Break up given range to match ring layout in TokenMetadata
ArrayList<Range<Token>> repairingRange = new ArrayList<>();
@@ -3002,6 +3003,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return repairingRange;
}
+ public TokenFactory getTokenFactory()
+ {
+ return tokenMetadata.partitioner.getTokenFactory();
+ }
+
public int forceRepairAsync(String keyspace, RepairOption options)
{
if (options.getRanges().isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
@@ -3144,12 +3150,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (cfMetaData == null)
throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
- return getNaturalEndpoints(keyspaceName, getPartitioner().getToken(cfMetaData.getKeyValidator().fromString(key)));
+ return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(cfMetaData.getKeyValidator().fromString(key)));
}
public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key)
{
- return getNaturalEndpoints(keyspaceName, getPartitioner().getToken(key));
+ return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key));
}
/**
@@ -3175,7 +3181,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key)
{
- return getLiveNaturalEndpoints(keyspace, getPartitioner().decorateKey(key));
+ return getLiveNaturalEndpoints(keyspace, tokenMetadata.decorateKey(key));
}
public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos)
@@ -3438,7 +3444,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost);
// stream all hints -- range list will be a singleton of "the entire ring"
- Token token = StorageService.getPartitioner().getMinimumToken();
+ Token token = tokenMetadata.partitioner.getMinimumToken();
List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token));
return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
@@ -3454,13 +3460,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
try
{
- getPartitioner().getTokenFactory().validate(newToken);
+ getTokenFactory().validate(newToken);
}
catch (ConfigurationException e)
{
throw new IOException(e.getMessage());
}
- move(getPartitioner().getTokenFactory().fromString(newToken));
+ move(getTokenFactory().fromString(newToken));
}
/**
@@ -3915,9 +3921,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
@VisibleForTesting
public IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
{
- IPartitioner oldPartitioner = DatabaseDescriptor.getPartitioner();
- DatabaseDescriptor.setPartitioner(newPartitioner);
- valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner());
+ IPartitioner oldPartitioner = DatabaseDescriptor.setPartitionerUnsafe(newPartitioner);
+ tokenMetadata = tokenMetadata.cloneWithNewPartitioner(newPartitioner);
+ valueFactory = new VersionedValue.VersionedValueFactory(newPartitioner);
return oldPartitioner;
}
@@ -3944,7 +3950,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
List<Token> sortedTokens = tokenMetadata.sortedTokens();
// describeOwnership returns tokens in an unspecified order, let's re-order them
- Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens));
+ Map<Token, Float> tokenMap = new TreeMap<Token, Float>(tokenMetadata.partitioner.describeOwnership(sortedTokens));
Map<InetAddress, Float> nodeMap = new LinkedHashMap<>();
for (Map.Entry<Token, Float> entry : tokenMap.entrySet())
{
@@ -4004,7 +4010,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values())
endpointsGroupedByDc.add(endpoints);
- Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(tokenMetadata.sortedTokens());
+ Map<Token, Float> tokenOwnership = tokenMetadata.partitioner.describeOwnership(tokenMetadata.sortedTokens());
LinkedHashMap<InetAddress, Float> finalOwnership = Maps.newLinkedHashMap();
// calculate ownership per dc
@@ -4211,7 +4217,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
this.keyspace = keyspace;
try
{
- setPartitioner(DatabaseDescriptor.getPartitioner());
for (Map.Entry<Range<Token>, List<InetAddress>> entry : StorageService.instance.getRangeToAddressMap(keyspace).entrySet())
{
Range<Token> range = entry.getKey();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
index fffb4e1..4171694 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -19,10 +19,8 @@ package org.apache.cassandra.service.pager;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.StorageService;
/**
* Pages a RangeSliceCommand whose predicate is a name query.
@@ -44,7 +42,7 @@ public class RangeNamesQueryPager extends AbstractQueryPager
if (state != null)
{
- lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
+ lastReturnedKey = command.metadata().decorateKey(state.partitionKey);
restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 6c08be0..0139d9c 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -22,7 +22,6 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
if (state != null)
{
- lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
+ lastReturnedKey = command.metadata().decorateKey(state.partitionKey);
lastReturnedClustering = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 579c315..95bd464 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.service.paxos;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.UUID;
import com.google.common.base.Objects;
@@ -122,9 +123,9 @@ public class Commit
public Commit deserialize(DataInputPlus in, int version) throws IOException
{
- DecoratedKey key = null;
+ ByteBuffer key = null;
if (version < MessagingService.VERSION_30)
- key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+ key = ByteBufferUtil.readWithShortLength(in);
UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 61656b3..a098786 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -193,7 +193,7 @@ public class StreamReader
public DecoratedKey newPartition() throws IOException
{
- key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+ key = metadata.decorateKey(ByteBufferUtil.readWithShortLength(in));
partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
staticRow = iterator.readStaticRow();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 0674c62..a7b8b07 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -549,7 +549,7 @@ public class CassandraServer implements Cassandra.Iface
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(metadata, key);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ DecoratedKey dk = metadata.decorateKey(key);
commands.add(SinglePartitionReadCommand.create(true, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, dk, filter));
}
@@ -617,7 +617,7 @@ public class CassandraServer implements Cassandra.Iface
filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(cellname.clustering, metadata.comparator), false);
}
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ DecoratedKey dk = metadata.decorateKey(key);
SinglePartitionReadCommand<?> command = SinglePartitionReadCommand.create(true, metadata, FBUtilities.nowInSeconds(), columns, RowFilter.NONE, DataLimits.NONE, dk, filter);
try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState), command))
@@ -694,22 +694,23 @@ public class CassandraServer implements Cassandra.Iface
ColumnFilter columnFilter;
ClusteringIndexFilter filter;
- if (cfs.metadata.isSuper() && !column_parent.isSetSuper_column())
+ CFMetaData metadata = cfs.metadata;
+ if (metadata.isSuper() && !column_parent.isSetSuper_column())
{
// If we count on a super column table without having set the super column name, we're in fact interested by the count of super columns
- columnFilter = ColumnFilter.all(cfs.metadata);
- filter = new ClusteringIndexSliceFilter(makeSlices(cfs.metadata, sliceRange), sliceRange.reversed);
+ columnFilter = ColumnFilter.all(metadata);
+ filter = new ClusteringIndexSliceFilter(makeSlices(metadata, sliceRange), sliceRange.reversed);
}
else
{
- columnFilter = makeColumnFilter(cfs.metadata, column_parent, sliceRange);
- filter = toInternalFilter(cfs.metadata, column_parent, sliceRange);
+ columnFilter = makeColumnFilter(metadata, column_parent, sliceRange);
+ filter = toInternalFilter(metadata, column_parent, sliceRange);
}
- DataLimits limits = getLimits(1, cfs.metadata.isSuper() && !column_parent.isSetSuper_column(), predicate);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ DataLimits limits = getLimits(1, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate);
+ DecoratedKey dk = metadata.decorateKey(key);
- return QueryPagers.countPaged(cfs.metadata,
+ return QueryPagers.countPaged(metadata,
dk,
columnFilter,
filter,
@@ -821,11 +822,9 @@ public class CassandraServer implements Cassandra.Iface
org.apache.cassandra.db.Mutation mutation;
try
{
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
Cell cell = cellFromColumn(metadata, name, column);
- PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell));
mutation = new org.apache.cassandra.db.Mutation(update);
}
@@ -913,7 +912,7 @@ public class CassandraServer implements Cassandra.Iface
for (Column column : updates)
ThriftValidation.validateColumnData(metadata, null, column);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ DecoratedKey dk = metadata.decorateKey(key);
int nowInSec = FBUtilities.nowInSeconds();
PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec));
@@ -1080,7 +1079,6 @@ public class CassandraServer implements Cassandra.Iface
for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry: mutation_map.entrySet())
{
ByteBuffer key = mutationEntry.getKey();
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
// We need to separate mutation for standard cf and counter cf (that will be encapsulated in a
// CounterMutation) because it doesn't follow the same code path
@@ -1120,6 +1118,7 @@ public class CassandraServer implements Cassandra.Iface
}
sortAndMerge(metadata, cells, nowInSec);
+ DecoratedKey dk = metadata.decorateKey(key);
PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()));
org.apache.cassandra.db.Mutation mutation;
@@ -1320,7 +1319,7 @@ public class CassandraServer implements Cassandra.Iface
if (isCommutativeOp)
ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ DecoratedKey dk = metadata.decorateKey(key);
int nowInSec = FBUtilities.nowInSeconds();
PartitionUpdate update;
@@ -1473,7 +1472,7 @@ public class CassandraServer implements Cassandra.Iface
org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
consistencyLevel.validateForRead(keyspace);
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = metadata.partitioner;
AbstractBounds<PartitionPosition> bounds;
if (range.start_key == null)
{
@@ -1558,7 +1557,7 @@ public class CassandraServer implements Cassandra.Iface
org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
consistencyLevel.validateForRead(keyspace);
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = metadata.partitioner;
AbstractBounds<PartitionPosition> bounds;
if (range.start_key == null)
{
@@ -1670,7 +1669,7 @@ public class CassandraServer implements Cassandra.Iface
org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
consistencyLevel.validateForRead(keyspace);
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = metadata.partitioner;
AbstractBounds<PartitionPosition> bounds = new Bounds<>(PartitionPosition.ForKey.get(index_clause.start_key, p),
p.getMinimumToken().minKeyBound());
@@ -1767,7 +1766,7 @@ public class CassandraServer implements Cassandra.Iface
public String describe_partitioner() throws TException
{
- return StorageService.getPartitioner().getClass().getName();
+ return StorageService.instance.getTokenMetadata().getClass().getName();
}
public String describe_snitch() throws TException
@@ -1796,8 +1795,8 @@ public class CassandraServer implements Cassandra.Iface
{
try
{
- Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
- Range<Token> tr = new Range<>(tf.fromString(start_token), tf.fromString(end_token));
+ Token.TokenFactory tf = StorageService.instance.getTokenFactory();
+ Range<Token> tr = new Range<Token>(tf.fromString(start_token), tf.fromString(end_token));
List<Pair<Range<Token>, Long>> splits =
StorageService.instance.getSplits(state().getKeyspace(), cfName, tr, keys_per_split);
List<CfSplit> result = new ArrayList<>(splits.size());
@@ -2134,14 +2133,13 @@ public class CassandraServer implements Cassandra.Iface
try
{
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
// See UpdateParameters.addCounter() for more details on this
ByteBuffer value = CounterContext.instance().createLocal(column.value);
CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path);
- PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell));
org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
@@ -2412,7 +2410,7 @@ public class CassandraServer implements Cassandra.Iface
DataLimits limits = getLimits(1, false, request.count);
ThriftValidation.validateKey(metadata, request.key);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(request.key);
+ DecoratedKey dk = metadata.decorateKey(request.key);
SinglePartitionReadCommand<?> cmd = SinglePartitionReadCommand.create(true, metadata, FBUtilities.nowInSeconds(), columns, RowFilter.NONE, limits, dk, filter);
return getSlice(Collections.<SinglePartitionReadCommand<?>>singletonList(cmd),
false,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 20d6eba..36383e0 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -24,10 +24,10 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.cassandra.io.compress.ICompressor;
-
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.Operator;
@@ -277,7 +277,7 @@ public class ThriftConversion
// We do not allow Thrift materialized views, so we always set it to false
boolean isMaterializedView = false;
- CFMetaData newCFMD = CFMetaData.create(cf_def.keyspace, cf_def.name, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, defs);
+ CFMetaData newCFMD = CFMetaData.create(cf_def.keyspace, cf_def.name, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, defs, DatabaseDescriptor.getPartitioner());
if (cf_def.isSetGc_grace_seconds())
newCFMD.gcGraceSeconds(cf_def.gc_grace_seconds);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 13c55aa..054b466 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -510,7 +509,7 @@ public class ThriftValidation
if (range.start_token != null && range.end_key != null)
throw new org.apache.cassandra.exceptions.InvalidRequestException("start token + end key is not a supported key range");
- IPartitioner p = StorageService.getPartitioner();
+ IPartitioner p = metadata.partitioner;
if (range.start_key != null && range.end_key != null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index 5354b43..435c505 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -24,10 +24,12 @@ import com.datastax.driver.core.*;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.Token.TokenFactory;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.schema.SchemaKeyspace;
@@ -64,11 +66,9 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
Metadata metadata = cluster.getMetadata();
- setPartitioner(metadata.getPartitioner());
-
Set<TokenRange> tokenRanges = metadata.getTokenRanges();
- Token.TokenFactory tokenFactory = getPartitioner().getTokenFactory();
+ TokenFactory tokenFactory = FBUtilities.newPartitioner(metadata.getPartitioner()).getTokenFactory();
for (TokenRange tokenRange : tokenRanges)
{
@@ -128,7 +128,16 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
for (Row colRow : session.execute(columnsQuery, keyspace, name))
defs.add(createDefinitionFromRow(colRow, keyspace, name));
- tables.put(name, CFMetaData.create(keyspace, name, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, defs));
+ tables.put(name, CFMetaData.create(keyspace,
+ name,
+ id,
+ isDense,
+ isCompound,
+ isSuper,
+ isCounter,
+ isMaterializedView,
+ defs,
+ DatabaseDescriptor.getPartitioner()));
}
return tables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
index b49055d..b48336f 100644
--- a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
+++ b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
@@ -83,7 +83,6 @@ public class CQLSSTableWriterLongTest
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
- .withPartitioner(StorageService.instance.getPartitioner())
.using(insert)
.withBufferSizeInMB(1)
.build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index e052c0a..11892a8 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -117,11 +117,11 @@ public class MockSchema
throw new RuntimeException(e);
}
}
- SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
+ SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList());
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
- .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header)
+ .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
.get(MetadataType.STATS);
- SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, Murmur3Partitioner.instance,
+ SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
reader.first = reader.last = readerBounds(generation);
@@ -139,7 +139,7 @@ public class MockSchema
{
String cfname = "mockcf" + (id.incrementAndGet());
CFMetaData metadata = newCFMetaData(ksname, cfname);
- return new ColumnFamilyStore(ks, cfname, Murmur3Partitioner.instance, 0, metadata, new Directories(metadata), false, false);
+ return new ColumnFamilyStore(ks, cfname, 0, metadata, new Directories(metadata), false, false);
}
private static CFMetaData newCFMetaData(String ksname, String cfname)
@@ -148,6 +148,7 @@ public class MockSchema
.addPartitionKey("key", UTF8Type.instance)
.addClusteringColumn("col", UTF8Type.instance)
.addRegularColumn("value", UTF8Type.instance)
+ .withPartitioner(Murmur3Partitioner.instance)
.build();
metadata.caching(CachingOptions.NONE);
return metadata;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/UpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/UpdateBuilder.java b/test/unit/org/apache/cassandra/UpdateBuilder.java
index b2d1d7f..3a5fbe6 100644
--- a/test/unit/org/apache/cassandra/UpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/UpdateBuilder.java
@@ -114,6 +114,6 @@ public class UpdateBuilder
return (DecoratedKey)partitionKey[0];
ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
- return StorageService.getPartitioner().decorateKey(key);
+ return metadata.decorateKey(key);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index e97af68..254c21c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -32,24 +32,21 @@ import java.util.function.Supplier;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.Slice.Bound;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
@@ -68,24 +65,29 @@ public class Util
{
private static List<UUID> hostIdPool = new ArrayList<UUID>();
+ public static IPartitioner testPartitioner()
+ {
+ return DatabaseDescriptor.getPartitioner();
+ }
+
public static DecoratedKey dk(String key)
{
- return StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(key));
+ return testPartitioner().decorateKey(ByteBufferUtil.bytes(key));
}
- public static DecoratedKey dk(String key, AbstractType type)
+ public static DecoratedKey dk(String key, AbstractType<?> type)
{
- return StorageService.getPartitioner().decorateKey(type.fromString(key));
+ return testPartitioner().decorateKey(type.fromString(key));
}
public static DecoratedKey dk(ByteBuffer key)
{
- return StorageService.getPartitioner().decorateKey(key);
+ return testPartitioner().decorateKey(key);
}
public static PartitionPosition rp(String key)
{
- return rp(key, StorageService.getPartitioner());
+ return rp(key, testPartitioner());
}
public static PartitionPosition rp(String key, IPartitioner partitioner)
@@ -107,7 +109,7 @@ public class Util
public static Token token(String key)
{
- return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(key));
+ return testPartitioner().getToken(ByteBufferUtil.bytes(key));
}
public static Range<PartitionPosition> range(String left, String right)
@@ -264,7 +266,7 @@ public class Util
return (DecoratedKey)partitionKey[0];
ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
- return StorageService.getPartitioner().decorateKey(key);
+ return metadata.decorateKey(key);
}
public static void assertEmptyUnfiltered(ReadCommand command)
@@ -501,4 +503,27 @@ public class Util
}
assertEquals(expected, s.get());
}
+
+ public static PartitionerSwitcher switchPartitioner(IPartitioner p)
+ {
+ return new PartitionerSwitcher(p);
+ }
+
+ public static class PartitionerSwitcher implements AutoCloseable
+ {
+ final IPartitioner oldP;
+ final IPartitioner newP;
+
+ public PartitionerSwitcher(IPartitioner partitioner)
+ {
+ newP = partitioner;
+ oldP = StorageService.instance.setPartitionerUnsafe(partitioner);
+ }
+
+ public void close()
+ {
+ IPartitioner p = StorageService.instance.setPartitionerUnsafe(oldP);
+ assert p == newP;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index cf05fe8..ced6343 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -133,7 +133,6 @@ public class CFMetaDataTest
private void checkInverses(CFMetaData cfm) throws Exception
{
- DecoratedKey k = StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(cfm.ksName));
KeyspaceMetadata keyspace = Schema.instance.getKSMetaData(cfm.ksName);
// Test thrift conversion
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
index 45994c7..35d57d9 100644
--- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
@@ -8,9 +8,6 @@ import com.datastax.driver.core.Statement;
import static org.junit.Assert.assertEquals;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-
public class IndexQueryPagingTest extends CQLTester
{
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index d0df5bc..5831e4a 100644
--- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -33,7 +33,7 @@ public class SelectionColumnMappingTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
- DatabaseDescriptor.setPartitioner(ByteOrderedPartitioner.instance);
+ DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
index c76d618..e07e421 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
@@ -17,13 +17,13 @@
*/
package org.apache.cassandra.cql3.validation.entities;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.service.StorageService;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -40,7 +40,8 @@ public class FrozenCollectionsTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
- DatabaseDescriptor.setPartitioner(new ByteOrderedPartitioner());
+ // Selecting partitioner for a table is not exposed on CREATE TABLE.
+ StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
index 7f8fa0b..7a0e8c8 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
@@ -44,7 +44,7 @@ public class JsonTest extends CQLTester
@BeforeClass
public static void setUp()
{
- DatabaseDescriptor.setPartitioner(ByteOrderedPartitioner.instance);
+ DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
index fb0d027..b69948f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexOnMapEntriesTest.java
@@ -38,7 +38,7 @@ public class SecondaryIndexOnMapEntriesTest extends CQLTester
@BeforeClass
public static void setUp()
{
- DatabaseDescriptor.setPartitioner(ByteOrderedPartitioner.instance);
+ DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
index 7274cd4..9430f6c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
@@ -22,16 +22,17 @@ import java.util.UUID;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.service.StorageService;
public class UserTypesTest extends CQLTester
{
@BeforeClass
public static void setUpClass()
{
- DatabaseDescriptor.setPartitioner(new ByteOrderedPartitioner());
+ // Selecting partitioner for a table is not exposed on CREATE TABLE.
+ StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
index f1e2f55..cf028a1 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
@@ -13,7 +13,7 @@ public class SelectLimitTest extends CQLTester
@BeforeClass
public static void setUp()
{
- DatabaseDescriptor.setPartitioner(ByteOrderedPartitioner.instance);
+ DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java
index 5152ba9..8a2e1c9 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java
@@ -19,7 +19,7 @@ public class SelectOrderedPartitionerTest extends CQLTester
@BeforeClass
public static void setUp()
{
- DatabaseDescriptor.setPartitioner(ByteOrderedPartitioner.instance);
+ DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index c578c5e..5f1523e 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -89,14 +89,13 @@ public class BatchlogManagerTest
.build()
.applyUnsafe();
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes("1234"));
+ DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
ArrayBackedPartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
Iterator<Row> iter = results.iterator();
assert iter.hasNext();
- Mutation mutation = new Mutation(KEYSPACE1, dk);
- mutation.add(PartitionUpdate.fullPartitionDelete(cfm,
- mutation.key(),
+ Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
+ dk,
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()));
mutation.applyUnsafe();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a08525a/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index b53f62c..b89b792 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -88,7 +88,7 @@ public class RowCacheTest
CacheService.instance.setRowCacheCapacityInMB(1);
ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
- DecoratedKey dk = cachedStore.partitioner.decorateKey(key);
+ DecoratedKey dk = cachedStore.decorateKey(key);
RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key);
@@ -255,7 +255,7 @@ public class RowCacheTest
CacheService.instance.setRowCacheCapacityInMB(1);
ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
- DecoratedKey dk = cachedStore.partitioner.decorateKey(key);
+ DecoratedKey dk = cachedStore.decorateKey(key);
RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
String values[] = new String[200];
for (int i = 0; i < 200; i++)