You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:02 UTC
[38/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/MultiCBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java b/src/java/org/apache/cassandra/db/MultiCBuilder.java
new file mode 100644
index 0000000..36a03ba
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import static java.util.Collections.singletonList;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Builder that allow to build multiple Clustering/Slice.Bound at the same time.
+ */
+public abstract class MultiCBuilder
+{
+ /**
+ * Creates a new empty {@code MultiCBuilder}.
+ */
+ public static MultiCBuilder create(ClusteringComparator comparator)
+ {
+ return new ConcreteMultiCBuilder(comparator);
+ }
+
+ /**
+ * Wraps an existing {@code CBuilder} to provide him with a MultiCBuilder interface
+ * for the sake of passing it to {@link Restriction.appendTo}. The resulting
+ * {@code MultiCBuilder} will still only be able to create a single clustering/bound
+ * and an {@code IllegalArgumentException} will be thrown if elements that added that
+ * would correspond to building multiple clusterings.
+ */
+ public static MultiCBuilder wrap(final CBuilder builder)
+ {
+ return new MultiCBuilder()
+ {
+ private boolean containsNull;
+ private boolean containsUnset;
+ private boolean hasMissingElements;
+
+ public MultiCBuilder addElementToAll(ByteBuffer value)
+ {
+ builder.add(value);
+
+ if (value == null)
+ containsNull = true;
+ if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ containsUnset = true;
+
+ return this;
+ }
+
+ public MultiCBuilder addEachElementToAll(List<ByteBuffer> values)
+ {
+ if (values.isEmpty())
+ {
+ hasMissingElements = true;
+ return this;
+ }
+
+ if (values.size() > 1)
+ throw new IllegalArgumentException();
+
+ return addElementToAll(values.get(0));
+ }
+
+ public MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values)
+ {
+ if (values.isEmpty())
+ {
+ hasMissingElements = true;
+ return this;
+ }
+
+ if (values.size() > 1)
+ throw new IllegalArgumentException();
+
+ return addEachElementToAll(values.get(0));
+ }
+
+ public int remainingCount()
+ {
+ return builder.remainingCount();
+ }
+
+ public boolean containsNull()
+ {
+ return containsNull;
+ }
+
+ public boolean containsUnset()
+ {
+ return containsUnset;
+ }
+
+ public boolean hasMissingElements()
+ {
+ return hasMissingElements;
+ }
+
+ public NavigableSet<Clustering> build()
+ {
+ return FBUtilities.singleton(builder.build(), builder.comparator());
+ }
+
+ public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive)
+ {
+ return FBUtilities.singleton(builder.buildBound(isStart, isInclusive), builder.comparator());
+ }
+ };
+ }
+
+ /**
+ * Adds the specified element to all the clusterings.
+ * <p>
+ * If this builder contains 2 clustering: A-B and A-C a call to this method to add D will result in the clusterings:
+ * A-B-D and A-C-D.
+ * </p>
+ *
+ * @param value the value of the next element
+ * @return this <code>MulitCBuilder</code>
+ */
+ public abstract MultiCBuilder addElementToAll(ByteBuffer value);
+
+ /**
+ * Adds individually each of the specified elements to the end of all of the existing clusterings.
+ * <p>
+ * If this builder contains 2 clusterings: A-B and A-C a call to this method to add D and E will result in the 4
+ * clusterings: A-B-D, A-B-E, A-C-D and A-C-E.
+ * </p>
+ *
+ * @param values the elements to add
+ * @return this <code>CompositeBuilder</code>
+ */
+ public abstract MultiCBuilder addEachElementToAll(List<ByteBuffer> values);
+
+ /**
+ * Adds individually each of the specified list of elements to the end of all of the existing composites.
+ * <p>
+ * If this builder contains 2 composites: A-B and A-C a call to this method to add [[D, E], [F, G]] will result in the 4
+ * composites: A-B-D-E, A-B-F-G, A-C-D-E and A-C-F-G.
+ * </p>
+ *
+ * @param values the elements to add
+ * @return this <code>CompositeBuilder</code>
+ */
+ public abstract MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values);
+
+ /**
+ * Returns the number of elements that can be added to the clusterings.
+ *
+ * @return the number of elements that can be added to the clusterings.
+ */
+ public abstract int remainingCount();
+
+ /**
+ * Checks if the clusterings contains null elements.
+ *
+ * @return <code>true</code> if the clusterings contains <code>null</code> elements, <code>false</code> otherwise.
+ */
+ public abstract boolean containsNull();
+
+ /**
+ * Checks if the clusterings contains unset elements.
+ *
+ * @return <code>true</code> if the clusterings contains <code>unset</code> elements, <code>false</code> otherwise.
+ */
+ public abstract boolean containsUnset();
+
+ /**
+ * Checks if some empty list of values have been added
+ * @return <code>true</code> if the clusterings have some missing elements, <code>false</code> otherwise.
+ */
+ public abstract boolean hasMissingElements();
+
+ /**
+ * Builds the <code>clusterings</code>.
+ *
+ * @return the clusterings
+ */
+ public abstract NavigableSet<Clustering> build();
+
+ /**
+ * Builds the <code>clusterings</code> with the specified EOC.
+ *
+ * @return the clusterings
+ */
+ public abstract SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive);
+
+ /**
+ * Checks if some elements can still be added to the clusterings.
+ *
+ * @return <code>true</code> if it is possible to add more elements to the clusterings, <code>false</code> otherwise.
+ */
+ public boolean hasRemaining()
+ {
+ return remainingCount() > 0;
+ }
+
+
+ private static class ConcreteMultiCBuilder extends MultiCBuilder
+ {
+ /**
+ * The table comparator.
+ */
+ private final ClusteringComparator comparator;
+
+ /**
+ * The elements of the clusterings
+ */
+ private final List<List<ByteBuffer>> elementsList = new ArrayList<>();
+
+ /**
+ * The number of elements that have been added.
+ */
+ private int size;
+
+ /**
+ * <code>true</code> if the clusterings have been build, <code>false</code> otherwise.
+ */
+ private boolean built;
+
+ /**
+ * <code>true</code> if the clusterings contains some <code>null</code> elements.
+ */
+ private boolean containsNull;
+
+ /**
+ * <code>true</code> if the composites contains some <code>unset</code> elements.
+ */
+ private boolean containsUnset;
+
+ /**
+ * <code>true</code> if some empty collection have been added.
+ */
+ private boolean hasMissingElements;
+
+ public ConcreteMultiCBuilder(ClusteringComparator comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ public MultiCBuilder addElementToAll(ByteBuffer value)
+ {
+ checkUpdateable();
+
+ if (isEmpty())
+ elementsList.add(new ArrayList<ByteBuffer>());
+
+ for (int i = 0, m = elementsList.size(); i < m; i++)
+ {
+ if (value == null)
+ containsNull = true;
+ if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ containsUnset = true;
+
+ elementsList.get(i).add(value);
+ }
+ size++;
+ return this;
+ }
+
+ public MultiCBuilder addEachElementToAll(List<ByteBuffer> values)
+ {
+ checkUpdateable();
+
+ if (isEmpty())
+ elementsList.add(new ArrayList<ByteBuffer>());
+
+ if (values.isEmpty())
+ {
+ hasMissingElements = true;
+ }
+ else
+ {
+ for (int i = 0, m = elementsList.size(); i < m; i++)
+ {
+ List<ByteBuffer> oldComposite = elementsList.remove(0);
+
+ for (int j = 0, n = values.size(); j < n; j++)
+ {
+ List<ByteBuffer> newComposite = new ArrayList<>(oldComposite);
+ elementsList.add(newComposite);
+
+ ByteBuffer value = values.get(j);
+
+ if (value == null)
+ containsNull = true;
+ if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ containsUnset = true;
+
+ newComposite.add(values.get(j));
+ }
+ }
+ }
+ size++;
+ return this;
+ }
+
+ public MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values)
+ {
+ checkUpdateable();
+
+ if (isEmpty())
+ elementsList.add(new ArrayList<ByteBuffer>());
+
+ if (values.isEmpty())
+ {
+ hasMissingElements = true;
+ }
+ else
+ {
+ for (int i = 0, m = elementsList.size(); i < m; i++)
+ {
+ List<ByteBuffer> oldComposite = elementsList.remove(0);
+
+ for (int j = 0, n = values.size(); j < n; j++)
+ {
+ List<ByteBuffer> newComposite = new ArrayList<>(oldComposite);
+ elementsList.add(newComposite);
+
+ List<ByteBuffer> value = values.get(j);
+
+ if (value.isEmpty())
+ hasMissingElements = true;
+
+ if (value.contains(null))
+ containsNull = true;
+ if (value.contains(ByteBufferUtil.UNSET_BYTE_BUFFER))
+ containsUnset = true;
+
+ newComposite.addAll(value);
+ }
+ }
+ size += values.get(0).size();
+ }
+ return this;
+ }
+
+ public int remainingCount()
+ {
+ return comparator.size() - size;
+ }
+
+ /**
+ * Checks if this builder is empty.
+ *
+ * @return <code>true</code> if this builder is empty, <code>false</code> otherwise.
+ */
+ public boolean isEmpty()
+ {
+ return elementsList.isEmpty();
+ }
+
+ public boolean containsNull()
+ {
+ return containsNull;
+ }
+
+ public boolean containsUnset()
+ {
+ return containsUnset;
+ }
+
+ public boolean hasMissingElements()
+ {
+ return hasMissingElements;
+ }
+
+ public NavigableSet<Clustering> build()
+ {
+ built = true;
+
+ if (hasMissingElements)
+ return FBUtilities.emptySortedSet(comparator);
+
+ CBuilder builder = CBuilder.create(comparator);
+
+ if (elementsList.isEmpty())
+ return FBUtilities.singleton(builder.build(), builder.comparator());
+
+ // Use a TreeSet to sort and eliminate duplicates
+ NavigableSet<Clustering> set = new TreeSet<>(builder.comparator());
+
+ for (int i = 0, m = elementsList.size(); i < m; i++)
+ {
+ List<ByteBuffer> elements = elementsList.get(i);
+ set.add(builder.buildWith(elements));
+ }
+ return set;
+ }
+
+ public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive)
+ {
+ built = true;
+
+ if (hasMissingElements)
+ return FBUtilities.emptySortedSet(comparator);
+
+ CBuilder builder = CBuilder.create(comparator);
+
+ if (elementsList.isEmpty())
+ return FBUtilities.singleton(builder.buildBound(isStart, isInclusive), comparator);
+
+ // Use a TreeSet to sort and eliminate duplicates
+ SortedSet<Slice.Bound> set = new TreeSet<>(comparator);
+
+ for (int i = 0, m = elementsList.size(); i < m; i++)
+ {
+ List<ByteBuffer> elements = elementsList.get(i);
+ set.add(builder.buildBoundWith(elements, isStart, isInclusive));
+ }
+ return set;
+ }
+
+ private void checkUpdateable()
+ {
+ if (!hasRemaining() || built)
+ throw new IllegalStateException("this builder cannot be updated anymore");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 9dd1686..355d259 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
@@ -27,13 +26,15 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,37 +52,27 @@ public class Mutation implements IMutation
// when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test
private final String keyspaceName;
- private final ByteBuffer key;
+ private final DecoratedKey key;
// map of column family id to mutations for that column family.
- private final Map<UUID, ColumnFamily> modifications;
-
- public Mutation(String keyspaceName, ByteBuffer key)
- {
- this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
- }
+ private final Map<UUID, PartitionUpdate> modifications;
- public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
+ public Mutation(String keyspaceName, DecoratedKey key)
{
- this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
+ this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
}
- public Mutation(String keyspaceName, Row row)
+ public Mutation(PartitionUpdate update)
{
- this(keyspaceName, row.key.getKey(), row.cf);
+ this(update.metadata().ksName, update.partitionKey(), Collections.singletonMap(update.metadata().cfId, update));
}
- protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
+ protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications)
{
this.keyspaceName = keyspaceName;
this.key = key;
this.modifications = modifications;
}
- public Mutation(ByteBuffer key, ColumnFamily cf)
- {
- this(cf.metadata().ksName, key, cf);
- }
-
public Mutation copy()
{
Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications));
@@ -98,53 +89,34 @@ public class Mutation implements IMutation
return modifications.keySet();
}
- public ByteBuffer key()
+ public DecoratedKey key()
{
return key;
}
- public Collection<ColumnFamily> getColumnFamilies()
+ public Collection<PartitionUpdate> getPartitionUpdates()
{
return modifications.values();
}
- public ColumnFamily getColumnFamily(UUID cfId)
+ public PartitionUpdate getPartitionUpdate(UUID cfId)
{
return modifications.get(cfId);
}
- /*
- * Specify a column family name and the corresponding column
- * family object.
- * param @ cf - column family name
- * param @ columnFamily - the column family.
- */
- public void add(ColumnFamily columnFamily)
+ public Mutation add(PartitionUpdate update)
{
- assert columnFamily != null;
- ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily);
+ assert update != null;
+ PartitionUpdate prev = modifications.put(update.metadata().cfId, update);
if (prev != null)
// developer error
- throw new IllegalArgumentException("Table " + columnFamily + " already has modifications in this mutation: " + prev);
- }
-
- /**
- * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary.
- */
- public ColumnFamily addOrGet(String cfName)
- {
- return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName));
+ throw new IllegalArgumentException("Table " + update.metadata().cfName + " already has modifications in this mutation: " + prev);
+ return this;
}
- public ColumnFamily addOrGet(CFMetaData cfm)
+ public PartitionUpdate get(CFMetaData cfm)
{
- ColumnFamily cf = modifications.get(cfm.cfId);
- if (cf == null)
- {
- cf = ArrayBackedSortedColumns.factory.create(cfm);
- modifications.put(cfm.cfId, cf);
- }
- return cf;
+ return modifications.get(cfm.cfId);
}
public boolean isEmpty()
@@ -152,56 +124,56 @@ public class Mutation implements IMutation
return modifications.isEmpty();
}
- public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive)
- {
- addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
- }
-
- public void addCounter(String cfName, CellName name, long value)
- {
- addOrGet(cfName).addCounter(name, value);
- }
-
- public void add(String cfName, CellName name, ByteBuffer value, long timestamp)
- {
- add(cfName, name, value, timestamp, 0);
- }
-
- public void delete(String cfName, long timestamp)
+ /**
+ * Creates a new mutation that merges all the provided mutations.
+ *
+ * @param mutations the mutations to merge together. All mutation must be
+ * on the same keyspace and partition key. There should also be at least one
+ * mutation.
+ * @return a mutation that contains all the modifications contained in {@code mutations}.
+ *
+ * @throws IllegalArgumentException if not all the mutations are on the same
+ * keyspace and key.
+ */
+ public static Mutation merge(List<Mutation> mutations)
{
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
- addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
- }
+ assert !mutations.isEmpty();
- public void delete(String cfName, CellName name, long timestamp)
- {
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
- addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
- }
+ if (mutations.size() == 1)
+ return mutations.get(0);
- public void deleteRange(String cfName, Composite start, Composite end, long timestamp)
- {
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
- addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
- }
+ Set<UUID> updatedTables = new HashSet<>();
+ String ks = null;
+ DecoratedKey key = null;
+ for (Mutation mutation : mutations)
+ {
+ updatedTables.addAll(mutation.modifications.keySet());
+ if (ks != null && !ks.equals(mutation.keyspaceName))
+ throw new IllegalArgumentException();
+ if (key != null && !key.equals(mutation.key))
+ throw new IllegalArgumentException();
+ ks = mutation.keyspaceName;
+ key = mutation.key;
+ }
- public void addAll(IMutation m)
- {
- if (!(m instanceof Mutation))
- throw new IllegalArgumentException();
+ List<PartitionUpdate> updates = new ArrayList<>(mutations.size());
+ Map<UUID, PartitionUpdate> modifications = new HashMap<>(updatedTables.size());
+ for (UUID table : updatedTables)
+ {
+ for (Mutation mutation : mutations)
+ {
+ PartitionUpdate upd = mutation.modifications.get(table);
+ if (upd != null)
+ updates.add(upd);
+ }
- Mutation mutation = (Mutation)m;
- if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key))
- throw new IllegalArgumentException();
+ if (updates.isEmpty())
+ continue;
- for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
- {
- // It's slighty faster to assume the key wasn't present and fix if
- // not in the case where it wasn't there indeed.
- ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue());
- if (cf != null)
- entry.getValue().addAll(cf);
+ modifications.put(table, updates.size() == 1 ? updates.get(0) : PartitionUpdate.merge(updates));
+ updates.clear();
}
+ return new Mutation(ks, key, modifications);
}
/*
@@ -243,7 +215,7 @@ public class Mutation implements IMutation
{
StringBuilder buff = new StringBuilder("Mutation(");
buff.append("keyspace='").append(keyspaceName).append('\'');
- buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
+ buff.append(", key='").append(ByteBufferUtil.bytesToHex(key.getKey())).append('\'');
buff.append(", modifications=[");
if (shallow)
{
@@ -256,14 +228,16 @@ public class Mutation implements IMutation
buff.append(StringUtils.join(cfnames, ", "));
}
else
- buff.append(StringUtils.join(modifications.values(), ", "));
+ {
+ buff.append("\n ").append(StringUtils.join(modifications.values(), "\n ")).append("\n");
+ }
return buff.append("])").toString();
}
public Mutation without(UUID cfId)
{
Mutation mutation = new Mutation(keyspaceName, key);
- for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
+ for (Map.Entry<UUID, PartitionUpdate> entry : modifications.entrySet())
if (!entry.getKey().equals(cfId))
mutation.add(entry.getValue());
return mutation;
@@ -276,58 +250,52 @@ public class Mutation implements IMutation
if (version < MessagingService.VERSION_20)
out.writeUTF(mutation.getKeyspaceName());
- ByteBufferUtil.writeWithShortLength(mutation.key(), out);
+ if (version < MessagingService.VERSION_30)
+ ByteBufferUtil.writeWithShortLength(mutation.key().getKey(), out);
/* serialize the modifications in the mutation */
int size = mutation.modifications.size();
out.writeInt(size);
assert size > 0;
- for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
- ColumnFamily.serializer.serialize(entry.getValue(), out, version);
+ for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet())
+ PartitionUpdate.serializer.serialize(entry.getValue(), out, version);
}
- public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
+ public Mutation deserialize(DataInput in, int version, SerializationHelper.Flag flag) throws IOException
{
String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
if (version < MessagingService.VERSION_20)
keyspaceName = in.readUTF();
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+ DecoratedKey key = null;
+ if (version < MessagingService.VERSION_30)
+ key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+
int size = in.readInt();
assert size > 0;
- Map<UUID, ColumnFamily> modifications;
if (size == 1)
+ return new Mutation(PartitionUpdate.serializer.deserialize(in, version, flag, key));
+
+ Map<UUID, PartitionUpdate> modifications = new HashMap<>(size);
+ PartitionUpdate update = null;
+ for (int i = 0; i < size; ++i)
{
- ColumnFamily cf = deserializeOneCf(in, version, flag);
- modifications = Collections.singletonMap(cf.id(), cf);
- keyspaceName = cf.metadata().ksName;
- }
- else
- {
- modifications = new HashMap<UUID, ColumnFamily>(size);
- for (int i = 0; i < size; ++i)
- {
- ColumnFamily cf = deserializeOneCf(in, version, flag);
- modifications.put(cf.id(), cf);
- keyspaceName = cf.metadata().ksName;
- }
+ update = PartitionUpdate.serializer.deserialize(in, version, flag, key);
+ modifications.put(update.metadata().cfId, update);
}
- return new Mutation(keyspaceName, key, modifications);
- }
+ if (keyspaceName == null)
+ keyspaceName = update.metadata().ksName;
+ if (key == null)
+ key = update.partitionKey();
- private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
- {
- ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
- // We don't allow Mutation with null column family, so we should never get null back.
- assert cf != null;
- return cf;
+ return new Mutation(keyspaceName, key, modifications);
}
public Mutation deserialize(DataInput in, int version) throws IOException
{
- return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
+ return deserialize(in, version, SerializationHelper.Flag.FROM_REMOTE);
}
public long serializedSize(Mutation mutation, int version)
@@ -338,12 +306,15 @@ public class Mutation implements IMutation
if (version < MessagingService.VERSION_20)
size += sizes.sizeof(mutation.getKeyspaceName());
- int keySize = mutation.key().remaining();
- size += sizes.sizeof((short) keySize) + keySize;
+ if (version < MessagingService.VERSION_30)
+ {
+ int keySize = mutation.key().getKey().remaining();
+ size += sizes.sizeof((short) keySize) + keySize;
+ }
size += sizes.sizeof(mutation.modifications.size());
- for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet())
- size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version);
+ for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet())
+ size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version, sizes);
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCell.java b/src/java/org/apache/cassandra/db/NativeCell.java
deleted file mode 100644
index dac5674..0000000
--- a/src/java/org/apache/cassandra/db/NativeCell.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeCell extends AbstractNativeCell
-{
- private static final long SIZE = ObjectSizes.measure(new NativeCell());
-
- NativeCell()
- {}
-
- public NativeCell(NativeAllocator allocator, OpOrder.Group writeOp, Cell copyOf)
- {
- super(allocator, writeOp, copyOf);
- }
-
- @Override
- public CellName name()
- {
- return this;
- }
-
- @Override
- public long timestamp()
- {
- return getLong(TIMESTAMP_OFFSET);
- }
-
- @Override
- public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
- {
- return new BufferCell(copy(metadata, allocator), allocator.clone(value()), timestamp());
- }
-
- @Override
- public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
- {
- return allocator.clone(this, metadata, opGroup);
- }
-
- @Override
- public void updateDigest(MessageDigest digest)
- {
- updateWithName(digest); // name
- updateWithValue(digest); // value
-
- FBUtilities.updateWithLong(digest, timestamp());
- FBUtilities.updateWithByte(digest, serializationFlags());
- }
-
- @Override
- public long unsharedHeapSizeExcludingData()
- {
- return SIZE;
- }
-
- @Override
- public long unsharedHeapSize()
- {
- return SIZE;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeCounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCounterCell.java b/src/java/org/apache/cassandra/db/NativeCounterCell.java
deleted file mode 100644
index c16cc44..0000000
--- a/src/java/org/apache/cassandra/db/NativeCounterCell.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeCounterCell extends NativeCell implements CounterCell
-{
- private static final long SIZE = ObjectSizes.measure(new NativeCounterCell());
-
- private NativeCounterCell()
- {}
-
- public NativeCounterCell(NativeAllocator allocator, OpOrder.Group writeOp, CounterCell copyOf)
- {
- super(allocator, writeOp, copyOf);
- }
-
- @Override
- protected void construct(Cell from)
- {
- super.construct(from);
- setLong(internalSize() - 8, ((CounterCell) from).timestampOfLastDelete());
- }
-
- @Override
- protected int postfixSize()
- {
- return 8;
- }
-
- @Override
- protected int sizeOf(Cell cell)
- {
- return 8 + super.sizeOf(cell);
- }
-
- @Override
- public long timestampOfLastDelete()
- {
- return getLong(internalSize() - 8);
- }
-
- @Override
- public long total()
- {
- return contextManager.total(value());
- }
-
- @Override
- public boolean hasLegacyShards()
- {
- return contextManager.hasLegacyShards(value());
- }
-
- @Override
- public Cell markLocalToBeCleared()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Cell diff(Cell cell)
- {
- return diffCounter(cell);
- }
-
- @Override
- public Cell reconcile(Cell cell)
- {
- return reconcileCounter(cell);
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.COUNTER_MASK;
- }
-
- @Override
- public int cellDataSize()
- {
- // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
- return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete());
- }
-
- @Override
- public int serializedSize(CellNameType type, TypeSizes typeSizes)
- {
- return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete());
- }
-
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- validateName(metadata);
- // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
- // which is not the internal representation of counters
- contextManager.validateContext(value());
- }
-
- /*
- * We have to special case digest creation for counter column because
- * we don't want to include the information about which shard of the
- * context is a delta or not, since this information differs from node to
- * node.
- */
- @Override
- public void updateDigest(MessageDigest digest)
- {
- updateWithName(digest);
-
- // We don't take the deltas into account in a digest
- contextManager.updateDigest(digest, value());
-
- FBUtilities.updateWithLong(digest, timestamp());
- FBUtilities.updateWithByte(digest, serializationFlags());
- FBUtilities.updateWithLong(digest, timestampOfLastDelete());
- }
-
- @Override
- public String getString(CellNameType comparator)
- {
- return String.format("%s(%s:false:%s@%d!%d)",
- getClass().getSimpleName(),
- comparator.getString(name()),
- contextManager.toString(value()),
- timestamp(),
- timestampOfLastDelete());
- }
-
- @Override
- public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
- {
- return new BufferCounterCell(copy(metadata, allocator), allocator.clone(value()), timestamp(), timestampOfLastDelete());
- }
-
- @Override
- public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
- {
- return allocator.clone(this, metadata, opGroup);
- }
-
- @Override
- public long unsharedHeapSizeExcludingData()
- {
- return SIZE;
- }
-
- @Override
- public long unsharedHeapSize()
- {
- return SIZE;
- }
-
- @Override
- public boolean equals(Cell cell)
- {
- return super.equals(cell) && timestampOfLastDelete() == ((CounterCell) cell).timestampOfLastDelete();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeDeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeDeletedCell.java b/src/java/org/apache/cassandra/db/NativeDeletedCell.java
deleted file mode 100644
index 6bdef43..0000000
--- a/src/java/org/apache/cassandra/db/NativeDeletedCell.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemoryUtil;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeDeletedCell extends NativeCell implements DeletedCell
-{
- private static final long SIZE = ObjectSizes.measure(new NativeDeletedCell());
-
- private NativeDeletedCell()
- {}
-
- public NativeDeletedCell(NativeAllocator allocator, OpOrder.Group writeOp, DeletedCell copyOf)
- {
- super(allocator, writeOp, copyOf);
- }
-
- @Override
- public Cell reconcile(Cell cell)
- {
- if (cell instanceof DeletedCell)
- return super.reconcile(cell);
- return cell.reconcile(this);
- }
-
- @Override
- public boolean isLive()
- {
- return false;
- }
-
- @Override
- public boolean isLive(long now)
- {
- return false;
- }
-
- @Override
- public int getLocalDeletionTime()
- {
- int v = getInt(valueStartOffset());
- return MemoryUtil.INVERTED_ORDER ? Integer.reverseBytes(v) : v;
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.DELETION_MASK;
- }
-
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- validateName(metadata);
-
- if ((int) (internalSize() - valueStartOffset()) != 4)
- throw new MarshalException("A tombstone value should be 4 bytes long");
- if (getLocalDeletionTime() < 0)
- throw new MarshalException("The local deletion time should not be negative");
- }
-
- @Override
- public void updateDigest(MessageDigest digest)
- {
- updateWithName(digest);
- FBUtilities.updateWithLong(digest, timestamp());
- FBUtilities.updateWithByte(digest, serializationFlags());
- }
-
- @Override
- public DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
- {
- return new BufferDeletedCell(copy(metadata, allocator), allocator.clone(value()), timestamp());
- }
-
- @Override
- public DeletedCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
- {
- return allocator.clone(this, metadata, opGroup);
- }
-
- @Override
- public long unsharedHeapSizeExcludingData()
- {
- return SIZE;
- }
-
- @Override
- public long unsharedHeapSize()
- {
- return SIZE;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeExpiringCell.java b/src/java/org/apache/cassandra/db/NativeExpiringCell.java
deleted file mode 100644
index 6369536..0000000
--- a/src/java/org/apache/cassandra/db/NativeExpiringCell.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeExpiringCell extends NativeCell implements ExpiringCell
-{
- private static final long SIZE = ObjectSizes.measure(new NativeExpiringCell());
-
- private NativeExpiringCell()
- {}
-
- public NativeExpiringCell(NativeAllocator allocator, OpOrder.Group writeOp, ExpiringCell copyOf)
- {
- super(allocator, writeOp, copyOf);
- }
-
- @Override
- protected int sizeOf(Cell cell)
- {
- return super.sizeOf(cell) + 8;
- }
-
- @Override
- protected void construct(Cell from)
- {
- ExpiringCell expiring = (ExpiringCell) from;
-
- setInt(internalSize() - 4, expiring.getTimeToLive());
- setInt(internalSize() - 8, expiring.getLocalDeletionTime());
- super.construct(from);
- }
-
- @Override
- protected int postfixSize()
- {
- return 8;
- }
-
- @Override
- public int getTimeToLive()
- {
- return getInt(internalSize() - 4);
- }
-
- @Override
- public int getLocalDeletionTime()
- {
- return getInt(internalSize() - 8);
- }
-
- @Override
- public boolean isLive()
- {
- return isLive(System.currentTimeMillis());
- }
-
- @Override
- public boolean isLive(long now)
- {
- return (int) (now / 1000) < getLocalDeletionTime();
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.EXPIRATION_MASK;
- }
-
- @Override
- public int cellDataSize()
- {
- return super.cellDataSize() + TypeSizes.NATIVE.sizeof(getLocalDeletionTime()) + TypeSizes.NATIVE.sizeof(getTimeToLive());
- }
-
- @Override
- public int serializedSize(CellNameType type, TypeSizes typeSizes)
- {
- /*
- * An expired column adds to a Cell :
- * 4 bytes for the localExpirationTime
- * + 4 bytes for the timeToLive
- */
- return super.serializedSize(type, typeSizes) + typeSizes.sizeof(getLocalDeletionTime()) + typeSizes.sizeof(getTimeToLive());
- }
-
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- super.validateFields(metadata);
-
- if (getTimeToLive() <= 0)
- throw new MarshalException("A column TTL should be > 0");
- if (getLocalDeletionTime() < 0)
- throw new MarshalException("The local expiration time should not be negative");
- }
-
- @Override
- public void updateDigest(MessageDigest digest)
- {
- super.updateDigest(digest);
- FBUtilities.updateWithInt(digest, getTimeToLive());
- }
-
- @Override
- public Cell reconcile(Cell cell)
- {
- long ts1 = timestamp(), ts2 = cell.timestamp();
- if (ts1 != ts2)
- return ts1 < ts2 ? cell : this;
- // we should prefer tombstones
- if (cell instanceof DeletedCell)
- return cell;
- int c = value().compareTo(cell.value());
- if (c != 0)
- return c < 0 ? cell : this;
- // If we have same timestamp and value, prefer the longest ttl
- if (cell instanceof ExpiringCell)
- {
- int let1 = getLocalDeletionTime(), let2 = cell.getLocalDeletionTime();
- if (let1 < let2)
- return cell;
- }
- return this;
- }
-
- public boolean equals(Cell cell)
- {
- if (!super.equals(cell))
- return false;
- ExpiringCell that = (ExpiringCell) cell;
- return getLocalDeletionTime() == that.getLocalDeletionTime() && getTimeToLive() == that.getTimeToLive();
- }
-
- @Override
- public String getString(CellNameType comparator)
- {
- return String.format("%s(%s!%d)", getClass().getSimpleName(), super.getString(comparator), getTimeToLive());
- }
-
- @Override
- public ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
- {
- return new BufferExpiringCell(name().copy(metadata, allocator), allocator.clone(value()), timestamp(), getTimeToLive(), getLocalDeletionTime());
- }
-
- @Override
- public ExpiringCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
- {
- return allocator.clone(this, metadata, opGroup);
- }
-
- @Override
- public long unsharedHeapSizeExcludingData()
- {
- return SIZE;
- }
-
- @Override
- public long unsharedHeapSize()
- {
- return SIZE;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
deleted file mode 100644
index f5eddb9..0000000
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.ISSTableSerializer;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.MarshalException;
-
-public interface OnDiskAtom
-{
- public Composite name();
-
- /**
- * For a standard column, this is the same as timestamp().
- * For a super column, this is the min/max column timestamp of the sub columns.
- */
- public long timestamp();
- public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
-
- public void validateFields(CFMetaData metadata) throws MarshalException;
- public void updateDigest(MessageDigest digest);
-
- public static class Serializer implements ISSTableSerializer<OnDiskAtom>
- {
- private final CellNameType type;
-
- public Serializer(CellNameType type)
- {
- this.type = type;
- }
-
- public void serializeForSSTable(OnDiskAtom atom, DataOutputPlus out) throws IOException
- {
- if (atom instanceof Cell)
- {
- type.columnSerializer().serialize((Cell)atom, out);
- }
- else
- {
- assert atom instanceof RangeTombstone;
- type.rangeTombstoneSerializer().serializeForSSTable((RangeTombstone)atom, out);
- }
- }
-
- public OnDiskAtom deserializeFromSSTable(DataInput in, Version version) throws IOException
- {
- return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
- }
-
- public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version) throws IOException
- {
- Composite name = type.serializer().deserialize(in);
- if (name.isEmpty())
- {
- // SSTableWriter.END_OF_ROW
- return null;
- }
-
- int b = in.readUnsignedByte();
- if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
- return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
- else
- return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
- }
-
- public long serializedSizeForSSTable(OnDiskAtom atom)
- {
- if (atom instanceof Cell)
- {
- return type.columnSerializer().serializedSize((Cell)atom, TypeSizes.NATIVE);
- }
- else
- {
- assert atom instanceof RangeTombstone;
- return type.rangeTombstoneSerializer().serializedSizeForSSTable((RangeTombstone)atom);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
deleted file mode 100644
index 40ef88e..0000000
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-
-public class PagedRangeCommand extends AbstractRangeCommand
-{
- public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer();
-
- public final Composite start;
- public final Composite stop;
- public final int limit;
- private final boolean countCQL3Rows;
-
- public PagedRangeCommand(String keyspace,
- String columnFamily,
- long timestamp,
- AbstractBounds<RowPosition> keyRange,
- SliceQueryFilter predicate,
- Composite start,
- Composite stop,
- List<IndexExpression> rowFilter,
- int limit,
- boolean countCQL3Rows)
- {
- super(keyspace, columnFamily, timestamp, keyRange, predicate, rowFilter);
- this.start = start;
- this.stop = stop;
- this.limit = limit;
- this.countCQL3Rows = countCQL3Rows;
- }
-
- public MessageOut<PagedRangeCommand> createMessage()
- {
- return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, serializer);
- }
-
- public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
- {
- Composite newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
- Composite newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
- return new PagedRangeCommand(keyspace,
- columnFamily,
- timestamp,
- subRange,
- ((SliceQueryFilter) predicate).cloneShallow(),
- newStart,
- newStop,
- rowFilter,
- limit,
- countCQL3Rows);
- }
-
- public AbstractRangeCommand withUpdatedLimit(int newLimit)
- {
- return new PagedRangeCommand(keyspace,
- columnFamily,
- timestamp,
- keyRange,
- ((SliceQueryFilter) predicate).cloneShallow(),
- start,
- stop,
- rowFilter,
- newLimit,
- countCQL3Rows);
- }
-
- public int limit()
- {
- return limit;
- }
-
- public boolean countCQL3Rows()
- {
- return countCQL3Rows;
- }
-
- public List<Row> executeLocally()
- {
- ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
-
- ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, (SliceQueryFilter)predicate, start, stop, rowFilter, limit, countCQL3Rows(), timestamp);
- if (cfs.indexManager.hasIndexFor(rowFilter))
- return cfs.search(exFilter);
- else
- return cfs.getRangeSlice(exFilter);
- }
-
- @Override
- public String toString()
- {
- return String.format("PagedRange(%s, %s, %d, %s, %s, %s, %s, %s, %d)", keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit);
- }
-
- private static class Serializer implements IVersionedSerializer<PagedRangeCommand>
- {
- public void serialize(PagedRangeCommand cmd, DataOutputPlus out, int version) throws IOException
- {
- out.writeUTF(cmd.keyspace);
- out.writeUTF(cmd.columnFamily);
- out.writeLong(cmd.timestamp);
-
- MessagingService.validatePartitioner(cmd.keyRange);
- AbstractBounds.rowPositionSerializer.serialize(cmd.keyRange, out, version);
-
- CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
-
- // SliceQueryFilter (the count is not used)
- SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate;
- metadata.comparator.sliceQueryFilterSerializer().serialize(filter, out, version);
-
- // The start and stop of the page
- metadata.comparator.serializer().serialize(cmd.start, out);
- metadata.comparator.serializer().serialize(cmd.stop, out);
-
- out.writeInt(cmd.rowFilter.size());
- for (IndexExpression expr : cmd.rowFilter)
- {
- expr.writeTo(out);;
- }
-
- out.writeInt(cmd.limit);
- if (version >= MessagingService.VERSION_21)
- out.writeBoolean(cmd.countCQL3Rows);
- }
-
- public PagedRangeCommand deserialize(DataInput in, int version) throws IOException
- {
- String keyspace = in.readUTF();
- String columnFamily = in.readUTF();
- long timestamp = in.readLong();
-
- AbstractBounds<RowPosition> keyRange =
- AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
-
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
- if (metadata == null)
- {
- String message = String.format("Got paged range command for nonexistent table %s.%s. If the table was just " +
- "created, this is likely due to the schema not being fully propagated. Please wait for schema " +
- "agreement on table creation." , keyspace, columnFamily);
- throw new UnknownColumnFamilyException(message, null);
- }
-
- SliceQueryFilter predicate = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
-
- Composite start = metadata.comparator.serializer().deserialize(in);
- Composite stop = metadata.comparator.serializer().deserialize(in);
-
- int filterCount = in.readInt();
- List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount);
- for (int i = 0; i < filterCount; i++)
- {
- rowFilter.add(IndexExpression.readFrom(in));
- }
-
- int limit = in.readInt();
- boolean countCQL3Rows = version >= MessagingService.VERSION_21
- ? in.readBoolean()
- : predicate.compositesToGroup >= 0 || predicate.count != 1; // See #6857
- return new PagedRangeCommand(keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit, countCQL3Rows);
- }
-
- public long serializedSize(PagedRangeCommand cmd, int version)
- {
- long size = 0;
-
- size += TypeSizes.NATIVE.sizeof(cmd.keyspace);
- size += TypeSizes.NATIVE.sizeof(cmd.columnFamily);
- size += TypeSizes.NATIVE.sizeof(cmd.timestamp);
-
- size += AbstractBounds.rowPositionSerializer.serializedSize(cmd.keyRange, version);
-
- CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
-
- size += metadata.comparator.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)cmd.predicate, version);
-
- size += metadata.comparator.serializer().serializedSize(cmd.start, TypeSizes.NATIVE);
- size += metadata.comparator.serializer().serializedSize(cmd.stop, TypeSizes.NATIVE);
-
- size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size());
- for (IndexExpression expr : cmd.rowFilter)
- {
- size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column);
- size += TypeSizes.NATIVE.sizeof(expr.operator.ordinal());
- size += TypeSizes.NATIVE.sizeofWithShortLength(expr.value);
- }
-
- size += TypeSizes.NATIVE.sizeof(cmd.limit);
- if (version >= MessagingService.VERSION_21)
- size += TypeSizes.NATIVE.sizeof(cmd.countCQL3Rows);
- return size;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
new file mode 100644
index 0000000..a1b1d00
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.security.MessageDigest;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.ColumnDefinition;
+
+/**
+ * Columns (or a subset of the columns) that a partition contains.
+ * This mainly groups both static and regular columns for convenience.
+ */
+public class PartitionColumns implements Iterable<ColumnDefinition>
+{
+ public static PartitionColumns NONE = new PartitionColumns(Columns.NONE, Columns.NONE);
+
+ public final Columns statics;
+ public final Columns regulars;
+
+ public PartitionColumns(Columns statics, Columns regulars)
+ {
+ this.statics = statics;
+ this.regulars = regulars;
+ }
+
+ public static PartitionColumns of(ColumnDefinition column)
+ {
+ return new PartitionColumns(column.isStatic() ? Columns.of(column) : Columns.NONE,
+ column.isStatic() ? Columns.NONE : Columns.of(column));
+ }
+
+ public PartitionColumns without(ColumnDefinition column)
+ {
+ return new PartitionColumns(column.isStatic() ? statics.without(column) : statics,
+ column.isStatic() ? regulars : regulars.without(column));
+ }
+
+ public PartitionColumns withoutStatics()
+ {
+ return statics.isEmpty() ? this : new PartitionColumns(Columns.NONE, regulars);
+ }
+
+ public boolean isEmpty()
+ {
+ return statics.isEmpty() && regulars.isEmpty();
+ }
+
+ public boolean contains(ColumnDefinition column)
+ {
+ return column.isStatic() ? statics.contains(column) : regulars.contains(column);
+ }
+
+ public boolean includes(PartitionColumns columns)
+ {
+ return statics.contains(columns.statics) && regulars.contains(columns.regulars);
+ }
+
+ public Iterator<ColumnDefinition> iterator()
+ {
+ return Iterators.concat(statics.iterator(), regulars.iterator());
+ }
+
+ public Iterator<ColumnDefinition> selectOrderIterator()
+ {
+ return Iterators.concat(statics.selectOrderIterator(), regulars.selectOrderIterator());
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[").append(statics).append(" | ").append(regulars).append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof PartitionColumns))
+ return false;
+
+ PartitionColumns that = (PartitionColumns)other;
+ return this.statics.equals(that.statics)
+ && this.regulars.equals(that.regulars);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(statics, regulars);
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ regulars.digest(digest);
+ statics.digest(digest);
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static class Builder
+ {
+ // Note that we do want to use sorted sets because we want the column definitions to be compared
+ // through compareTo, not equals. The former basically check it's the same column name, while the latter
+ // check it's the same object, including the same type.
+ private SortedSet<ColumnDefinition> regularColumns;
+ private SortedSet<ColumnDefinition> staticColumns;
+
+ public Builder add(ColumnDefinition c)
+ {
+ if (c.isStatic())
+ {
+ if (staticColumns == null)
+ staticColumns = new TreeSet<>();
+ staticColumns.add(c);
+ }
+ else
+ {
+ assert c.isRegular();
+ if (regularColumns == null)
+ regularColumns = new TreeSet<>();
+ regularColumns.add(c);
+ }
+ return this;
+ }
+
+ public int added()
+ {
+ return (regularColumns == null ? 0 : regularColumns.size())
+ + (staticColumns == null ? 0 : staticColumns.size());
+ }
+
+ public Builder addAll(Iterable<ColumnDefinition> columns)
+ {
+ for (ColumnDefinition c : columns)
+ add(c);
+ return this;
+ }
+
+ public Builder addAll(PartitionColumns columns)
+ {
+ if (regularColumns == null && !columns.regulars.isEmpty())
+ regularColumns = new TreeSet<>();
+
+ for (ColumnDefinition c : columns.regulars)
+ regularColumns.add(c);
+
+ if (staticColumns == null && !columns.statics.isEmpty())
+ staticColumns = new TreeSet<>();
+
+ for (ColumnDefinition c : columns.statics)
+ staticColumns.add(c);
+
+ return this;
+ }
+
+ public PartitionColumns build()
+ {
+ return new PartitionColumns(staticColumns == null ? Columns.NONE : Columns.from(staticColumns),
+ regularColumns == null ? Columns.NONE : Columns.from(regularColumns));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java
new file mode 100644
index 0000000..1dc940e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionPosition.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public interface PartitionPosition extends RingPosition<PartitionPosition>
+{
+ public static enum Kind
+ {
+ // Only add new values to the end of the enum, the ordinal is used
+ // during serialization
+ ROW_KEY, MIN_BOUND, MAX_BOUND;
+
+ private static final Kind[] allKinds = Kind.values();
+
+ static Kind fromOrdinal(int ordinal)
+ {
+ return allKinds[ordinal];
+ }
+ }
+
+ public static final class ForKey
+ {
+ public static PartitionPosition get(ByteBuffer key, IPartitioner p)
+ {
+ return key == null || key.remaining() == 0 ? p.getMinimumToken().minKeyBound() : p.decorateKey(key);
+ }
+ }
+
+ public static final RowPositionSerializer serializer = new RowPositionSerializer();
+
+ public Kind kind();
+ public boolean isMinimum();
+
+ public static class RowPositionSerializer implements IPartitionerDependentSerializer<PartitionPosition>
+ {
+ /*
+ * We need to be able to serialize both Token.KeyBound and
+ * DecoratedKey. To make this compact, we first write a byte whose
+ * meaning is:
+ * - 0: DecoratedKey
+ * - 1: a 'minimum' Token.KeyBound
+ * - 2: a 'maximum' Token.KeyBound
+ * In the case of the DecoratedKey, we then serialize the key (the
+ * token is recreated on the other side). In the other cases, we then
+ * serialize the token.
+ */
+ public void serialize(PartitionPosition pos, DataOutputPlus out, int version) throws IOException
+ {
+ Kind kind = pos.kind();
+ out.writeByte(kind.ordinal());
+ if (kind == Kind.ROW_KEY)
+ ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out);
+ else
+ Token.serializer.serialize(pos.getToken(), out, version);
+ }
+
+ public PartitionPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException
+ {
+ Kind kind = Kind.fromOrdinal(in.readByte());
+ if (kind == Kind.ROW_KEY)
+ {
+ ByteBuffer k = ByteBufferUtil.readWithShortLength(in);
+ return StorageService.getPartitioner().decorateKey(k);
+ }
+ else
+ {
+ Token t = Token.serializer.deserialize(in, p, version);
+ return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound();
+ }
+ }
+
+ public long serializedSize(PartitionPosition pos, int version)
+ {
+ Kind kind = pos.kind();
+ int size = 1; // 1 byte for enum
+ if (kind == Kind.ROW_KEY)
+ {
+ int keySize = ((DecoratedKey)pos).getKey().remaining();
+ size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize;
+ }
+ else
+ {
+ size += Token.serializer.serializedSize(pos.getToken(), version);
+ }
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
new file mode 100644
index 0000000..c11a9be
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A read command that selects a (part of a) range of partitions.
+ */
+public class PartitionRangeReadCommand extends ReadCommand
+{
+ protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
+
+ private final DataRange dataRange;
+
+ public PartitionRangeReadCommand(boolean isDigest,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange)
+ {
+ super(Kind.PARTITION_RANGE, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ this.dataRange = dataRange;
+ }
+
+ public PartitionRangeReadCommand(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange)
+ {
+ this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
+ }
+
+ /**
+ * Creates a new read command that query all the data in the table.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ *
+ * @return a newly created read command that queries everything in the table.
+ */
+ public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
+ {
+ return new PartitionRangeReadCommand(metadata,
+ nowInSec,
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ DataRange.allData(StorageService.getPartitioner()));
+ }
+
+ public DataRange dataRange()
+ {
+ return dataRange;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+ {
+ return dataRange.clusteringIndexFilter(key);
+ }
+
+ public boolean isNamesQuery()
+ {
+ return dataRange.isNamesQuery();
+ }
+
+ public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
+ {
+ return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
+ }
+
+ public PartitionRangeReadCommand copy()
+ {
+ return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
+ }
+
+ public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
+ {
+ return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange());
+ }
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getRangeRpcTimeout();
+ }
+
+ public boolean selects(DecoratedKey partitionKey, Clustering clustering)
+ {
+ if (!dataRange().contains(partitionKey))
+ return false;
+
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ return dataRange().clusteringIndexFilter(partitionKey).selects(clustering);
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.getRangeSlice(this, consistency);
+ }
+
+ public QueryPager getPager(PagingState pagingState)
+ {
+ if (isNamesQuery())
+ return new RangeNamesQueryPager(this, pagingState);
+ else
+ return new RangeSliceQueryPager(this, pagingState);
+ }
+
+ protected void recordLatency(ColumnFamilyMetrics metric, long latencyNanos)
+ {
+ metric.rangeLatency.addNano(latencyNanos);
+ }
+
+ protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+ {
+ ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(dataRange().keyRange()));
+ Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
+
+ // fetch data from current memtable, historical memtables, and SSTables in the correct order.
+ final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+
+ try
+ {
+ for (Memtable memtable : view.memtables)
+ {
+ @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
+ UnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+ }
+
+ for (SSTableReader sstable : view.sstables)
+ {
+ @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
+ UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift());
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+ }
+
+ return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+
+ throw e;
+ }
+ }
+
+ private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
+ {
+ return new WrappingUnfilteredPartitionIterator(iter)
+ {
+ @Override
+ public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+ {
+ // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
+ // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
+ DecoratedKey dk = iter.partitionKey();
+
+ // Check if this partition is in the rowCache and if it is, if it covers our filter
+ CachedPartition cached = cfs.getRawCachedPartition(dk);
+ ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk);
+
+ if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec()))
+ {
+ // We won't use 'iter' so close it now.
+ iter.close();
+
+ return filter.getUnfilteredRowIterator(columnFilter(), cached);
+ }
+
+ return iter;
+ }
+ };
+ }
+
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ if (dataRange.isUnrestricted() && rowFilter().isEmpty())
+ return;
+
+ sb.append(" WHERE ");
+ // We put the row filter first because the data range can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ {
+ sb.append(rowFilter());
+ if (!dataRange.isUnrestricted())
+ sb.append(" AND ");
+ }
+ if (!dataRange.isUnrestricted())
+ sb.append(dataRange.toCQLString(metadata()));
+ }
+
+ /**
+ * Allow to post-process the result of the query after it has been reconciled on the coordinator
+ * but before it is passed to the CQL layer to return the ResultSet.
+ *
+ * See CASSANDRA-8717 for why this exists.
+ */
+ public PartitionIterator postReconciliationProcessing(PartitionIterator result)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
+ SecondaryIndexSearcher searcher = getIndexSearcher(cfs);
+ return searcher == null ? result : searcher.postReconciliationProcessing(rowFilter(), result);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)",
+ metadata().ksName,
+ metadata().cfName,
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange().toString(metadata()));
+ }
+
+ protected void serializeSelection(DataOutputPlus out, int version) throws IOException
+ {
+ DataRange.serializer.serialize(dataRange(), out, version, metadata());
+ }
+
+ protected long selectionSerializedSize(int version)
+ {
+ return DataRange.serializer.serializedSize(dataRange(), version, metadata());
+ }
+
+ private static class Deserializer extends SelectionDeserializer
+ {
+ public ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ throws IOException
+ {
+ DataRange range = DataRange.serializer.deserialize(in, version, metadata);
+ return new PartitionRangeReadCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+ }
+ };
+}