You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:07 UTC
[43/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
new file mode 100644
index 0000000..73cedb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.*;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A clustering prefix is basically the unit of what a {@link ClusteringComparator} can compare.
+ * <p>
+ * It holds values for the clustering columns of a table (potentially only a prefix of all of them) and it has
+ * a "kind" that allows us to implement slices with inclusive and exclusive bounds.
+ * <p>
+ * In practice, {@code ClusteringPrefix} is just the common parts to its 2 main subtype: {@link Clustering} and
+ * {@link Slice.Bound}, where:
+ * 1) {@code Clustering} represents the clustering values for a row, i.e. the values for it's clustering columns.
+ * 2) {@code Slice.Bound} represents a bound (start or end) of a slice (of rows).
+ * See those classes for more details.
+ */
+public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurableMemory, Clusterable
+{
+ public static final Serializer serializer = new Serializer();
+
+ /**
+ * The kind of clustering prefix this actually is.
+ *
+ * The kind {@code STATIC_CLUSTERING} is only implemented by {@link Clustering.STATIC_CLUSTERING} and {@code CLUSTERING} is
+ * implemented by the {@link Clustering} class. The rest is used by {@link Slice.Bound} and {@link RangeTombstone.Bound}.
+ */
+ public enum Kind
+ {
+ // WARNING: the ordering of that enum matters because we use ordinal() in the serialization
+
+ EXCL_END_BOUND(0, -1),
+ INCL_START_BOUND(1, -1),
+ EXCL_END_INCL_START_BOUNDARY(1, -1),
+ STATIC_CLUSTERING(2, -1),
+ CLUSTERING(3, 0),
+ INCL_END_EXCL_START_BOUNDARY(4, -1),
+ INCL_END_BOUND(4, 1),
+ EXCL_START_BOUND(5, 1);
+
+ private final int comparison;
+
+ // If clusterable c1 has this Kind and is a strict prefix of clusterable c2, then this
+ // is the result of compare(c1, c2). Basically, this is the same as comparing the kind of c1 to
+ // CLUSTERING.
+ public final int prefixComparisonResult;
+
+ private Kind(int comparison, int prefixComparisonResult)
+ {
+ this.comparison = comparison;
+ this.prefixComparisonResult = prefixComparisonResult;
+ }
+
+ /**
+ * Compares the 2 provided kind.
+ * <p>
+ * Note: this should be used instead of {@link #compareTo} when comparing clustering prefixes. We do
+ * not override that latter method because it is final for an enum.
+ */
+ public static int compare(Kind k1, Kind k2)
+ {
+ return Integer.compare(k1.comparison, k2.comparison);
+ }
+
+ /**
+ * Returns the inverse of the current kind.
+ * <p>
+ * This invert both start into end (and vice-versa) and inclusive into exclusive (and vice-versa).
+ *
+ * @return the invert of this kind. For instance, if this kind is an exlusive start, this return
+ * an inclusive end.
+ */
+ public Kind invert()
+ {
+ switch (this)
+ {
+ case EXCL_START_BOUND: return INCL_END_BOUND;
+ case INCL_START_BOUND: return EXCL_END_BOUND;
+ case EXCL_END_BOUND: return INCL_START_BOUND;
+ case INCL_END_BOUND: return EXCL_START_BOUND;
+ case EXCL_END_INCL_START_BOUNDARY: return INCL_END_EXCL_START_BOUNDARY;
+ case INCL_END_EXCL_START_BOUNDARY: return EXCL_END_INCL_START_BOUNDARY;
+ default: return this;
+ }
+ }
+
+ public boolean isBound()
+ {
+ switch (this)
+ {
+ case INCL_START_BOUND:
+ case INCL_END_BOUND:
+ case EXCL_START_BOUND:
+ case EXCL_END_BOUND:
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isBoundary()
+ {
+ switch (this)
+ {
+ case INCL_END_EXCL_START_BOUNDARY:
+ case EXCL_END_INCL_START_BOUNDARY:
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isStart()
+ {
+ switch (this)
+ {
+ case INCL_START_BOUND:
+ case EXCL_END_INCL_START_BOUNDARY:
+ case INCL_END_EXCL_START_BOUNDARY:
+ case EXCL_START_BOUND:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public boolean isEnd()
+ {
+ switch (this)
+ {
+ case INCL_END_BOUND:
+ case EXCL_END_INCL_START_BOUNDARY:
+ case INCL_END_EXCL_START_BOUNDARY:
+ case EXCL_END_BOUND:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public boolean isOpen(boolean reversed)
+ {
+ return reversed ? isEnd() : isStart();
+ }
+
+ public boolean isClose(boolean reversed)
+ {
+ return reversed ? isStart() : isEnd();
+ }
+
+ public Kind closeBoundOfBoundary(boolean reversed)
+ {
+ assert isBoundary();
+ return reversed
+ ? (this == INCL_END_EXCL_START_BOUNDARY ? EXCL_START_BOUND : INCL_START_BOUND)
+ : (this == INCL_END_EXCL_START_BOUNDARY ? INCL_END_BOUND : EXCL_END_BOUND);
+ }
+
+ public Kind openBoundOfBoundary(boolean reversed)
+ {
+ assert isBoundary();
+ return reversed
+ ? (this == INCL_END_EXCL_START_BOUNDARY ? INCL_END_BOUND : EXCL_END_BOUND)
+ : (this == INCL_END_EXCL_START_BOUNDARY ? EXCL_START_BOUND : INCL_START_BOUND);
+ }
+ }
+
+ public Kind kind();
+
+ /**
+ * The number of values in this prefix.
+ *
+ * There can't be more values that the this is a prefix of has of clustering columns.
+ *
+ * @return the number of values in this prefix.
+ */
+ public int size();
+
+ /**
+ * Retrieves the ith value of this prefix.
+ *
+ * @param i the index of the value to retrieve. Must be such that {@code 0 <= i < size()}.
+ *
+ * @return the ith value of this prefix. Note that a value can be {@code null}.
+ */
+ public ByteBuffer get(int i);
+
+ public void digest(MessageDigest digest);
+
+ // Used to verify if batches goes over a given size
+ public int dataSize();
+
+ public String toString(CFMetaData metadata);
+
+ public void writeTo(Writer writer);
+
+ /**
+ * The values of this prefix as an array.
+ * <p>
+ * Please note that this may or may not require an array creation. So 1) you should *not*
+ * modify the returned array and 2) it's more efficient to use {@link #size()} and
+ * {@link #get} unless you actually need an array.
+ *
+ * @return the values for this prefix as an array.
+ */
+ public ByteBuffer[] getRawValues();
+
+ /**
+ * Interface for writing a clustering prefix.
+ * <p>
+ * Each value for the prefix should simply be written in order.
+ */
+ public interface Writer
+ {
+ /**
+ * Write the next value to the writer.
+ *
+ * @param value the value to write.
+ */
+ public void writeClusteringValue(ByteBuffer value);
+ }
+
+ public static class Serializer
+ {
+ public void serialize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+ {
+ // We shouldn't serialize static clusterings
+ assert clustering.kind() != Kind.STATIC_CLUSTERING;
+ if (clustering.kind() == Kind.CLUSTERING)
+ {
+ out.writeByte(clustering.kind().ordinal());
+ Clustering.serializer.serialize((Clustering)clustering, out, version, types);
+ }
+ else
+ {
+ Slice.Bound.serializer.serialize((Slice.Bound)clustering, out, version, types);
+ }
+ }
+
+ public ClusteringPrefix deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+ {
+ Kind kind = Kind.values()[in.readByte()];
+ // We shouldn't serialize static clusterings
+ assert kind != Kind.STATIC_CLUSTERING;
+ if (kind == Kind.CLUSTERING)
+ return Clustering.serializer.deserialize(in, version, types);
+ else
+ return Slice.Bound.serializer.deserializeValues(in, kind, version, types);
+ }
+
+ public long serializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes)
+ {
+ // We shouldn't serialize static clusterings
+ assert clustering.kind() != Kind.STATIC_CLUSTERING;
+ if (clustering.kind() == Kind.CLUSTERING)
+ return 1 + Clustering.serializer.serializedSize((Clustering)clustering, version, types, sizes);
+ else
+ return Slice.Bound.serializer.serializedSize((Slice.Bound)clustering, version, types, sizes);
+ }
+
+ void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+ {
+ if (clustering.size() == 0)
+ return;
+
+ writeHeader(clustering, out);
+ for (int i = 0; i < clustering.size(); i++)
+ {
+ ByteBuffer v = clustering.get(i);
+ if (v == null || !v.hasRemaining())
+ continue; // handled in the header
+
+ types.get(i).writeValue(v, out);
+ }
+ }
+
+ long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes)
+ {
+ if (clustering.size() == 0)
+ return 0;
+
+ long size = headerBytesCount(clustering.size());
+ for (int i = 0; i < clustering.size(); i++)
+ {
+ ByteBuffer v = clustering.get(i);
+ if (v == null || !v.hasRemaining())
+ continue; // handled in the header
+
+ size += types.get(i).writtenLength(v, sizes);
+ }
+ return size;
+ }
+
+ void deserializeValuesWithoutSize(DataInput in, int size, int version, List<AbstractType<?>> types, ClusteringPrefix.Writer writer) throws IOException
+ {
+ if (size == 0)
+ return;
+
+ int[] header = readHeader(size, in);
+ for (int i = 0; i < size; i++)
+ {
+ if (isNull(header, i))
+ writer.writeClusteringValue(null);
+ else if (isEmpty(header, i))
+ writer.writeClusteringValue(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ else
+ writer.writeClusteringValue(types.get(i).readValue(in));
+ }
+ }
+
+ private int headerBytesCount(int size)
+ {
+ // For each component, we store 2 bit to know if the component is empty or null (or neither).
+ // We thus handle 4 component per byte
+ return size / 4 + (size % 4 == 0 ? 0 : 1);
+ }
+
+ /**
+ * Whatever the type of a given clustering column is, its value can always be either empty or null. So we at least need to distinguish those
+ * 2 values, and because we want to be able to store fixed width values without appending their (fixed) size first, we need a way to encode
+ * empty values too. So for that, every clustering prefix includes a "header" that contains 2 bits per element in the prefix. For each element,
+ * those 2 bits encode whether the element is null, empty, or none of those.
+ */
+ private void writeHeader(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
+ {
+ int nbBytes = headerBytesCount(clustering.size());
+ for (int i = 0; i < nbBytes; i++)
+ {
+ int b = 0;
+ for (int j = 0; j < 4; j++)
+ {
+ int c = i * 4 + j;
+ if (c >= clustering.size())
+ break;
+
+ ByteBuffer v = clustering.get(c);
+ if (v == null)
+ b |= (1 << (j * 2) + 1);
+ else if (!v.hasRemaining())
+ b |= (1 << (j * 2));
+ }
+ out.writeByte((byte)b);
+ }
+ }
+
+ private int[] readHeader(int size, DataInput in) throws IOException
+ {
+ int nbBytes = headerBytesCount(size);
+ int[] header = new int[nbBytes];
+ for (int i = 0; i < nbBytes; i++)
+ header[i] = in.readUnsignedByte();
+ return header;
+ }
+
+ private boolean isNull(int[] header, int i)
+ {
+ int b = header[i / 4];
+ int mask = 1 << ((i % 4) * 2) + 1;
+ return (b & mask) != 0;
+ }
+
+ private boolean isEmpty(int[] header, int i)
+ {
+ int b = header[i / 4];
+ int mask = 1 << ((i % 4) * 2);
+ return (b & mask) != 0;
+ }
+ }
+
+ /**
+ * Helper class that makes the deserialization of clustering prefixes faster.
+ * <p>
+ * The main reason for this is that when we deserialize rows from sstables, there is many cases where we have
+ * a bunch of rows to skip at the beginning of an index block because those rows are before the requested slice.
+ * This class make sure we can answer the question "is the next row on disk before the requested slice" with as
+ * little work as possible. It does that by providing a comparison method that deserialize only what is needed
+ * to decide of the comparison.
+ */
+ public static class Deserializer
+ {
+ private final ClusteringComparator comparator;
+ private final DataInput in;
+ private final SerializationHeader serializationHeader;
+
+ private boolean nextIsRow;
+ private int[] nextHeader;
+
+ private int nextSize;
+ private ClusteringPrefix.Kind nextKind;
+ private int deserializedSize;
+ private final ByteBuffer[] nextValues;
+
+ public Deserializer(ClusteringComparator comparator, DataInput in, SerializationHeader header)
+ {
+ this.comparator = comparator;
+ this.in = in;
+ this.serializationHeader = header;
+ this.nextValues = new ByteBuffer[comparator.size()];
+ }
+
+ public void prepare(int flags) throws IOException
+ {
+ assert !UnfilteredSerializer.isStatic(flags) : "Flags = " + flags;
+ this.nextIsRow = UnfilteredSerializer.kind(flags) == Unfiltered.Kind.ROW;
+ this.nextKind = nextIsRow ? Kind.CLUSTERING : ClusteringPrefix.Kind.values()[in.readByte()];
+ this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort();
+ this.nextHeader = serializer.readHeader(nextSize, in);
+ this.deserializedSize = 0;
+ }
+
+ public int compareNextTo(Slice.Bound bound) throws IOException
+ {
+ if (bound == Slice.Bound.TOP)
+ return -1;
+
+ for (int i = 0; i < bound.size(); i++)
+ {
+ if (!hasComponent(i))
+ return nextKind.prefixComparisonResult;
+
+ int cmp = comparator.compareComponent(i, nextValues[i], bound.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+
+ if (bound.size() == nextSize)
+ return nextKind.compareTo(bound.kind());
+
+ // We know that we'll have exited already if nextSize < bound.size
+ return -bound.kind().prefixComparisonResult;
+ }
+
+ private boolean hasComponent(int i) throws IOException
+ {
+ if (i >= nextSize)
+ return false;
+
+ while (deserializedSize <= i)
+ deserializeOne();
+
+ return true;
+ }
+
+ private boolean deserializeOne() throws IOException
+ {
+ if (deserializedSize == nextSize)
+ return false;
+
+ int i = deserializedSize++;
+ nextValues[i] = serializer.isNull(nextHeader, i)
+ ? null
+ : (serializer.isEmpty(nextHeader, i) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : serializationHeader.clusteringTypes().get(i).readValue(in));
+ return true;
+ }
+
+ private void deserializeAll() throws IOException
+ {
+ while (deserializeOne())
+ continue;
+ }
+
+ public RangeTombstone.Bound.Kind deserializeNextBound(RangeTombstone.Bound.Writer writer) throws IOException
+ {
+ assert !nextIsRow;
+ deserializeAll();
+ for (int i = 0; i < nextSize; i++)
+ writer.writeClusteringValue(nextValues[i]);
+ writer.writeBoundKind(nextKind);
+ return nextKind;
+ }
+
+ public void deserializeNextClustering(Clustering.Writer writer) throws IOException
+ {
+ assert nextIsRow && nextSize == nextValues.length;
+ deserializeAll();
+ for (int i = 0; i < nextSize; i++)
+ writer.writeClusteringValue(nextValues[i]);
+ }
+
+ public ClusteringPrefix.Kind skipNext() throws IOException
+ {
+ for (int i = deserializedSize; i < nextSize; i++)
+ if (!serializer.isNull(nextHeader, i) && !serializer.isEmpty(nextHeader, i))
+ serializationHeader.clusteringTypes().get(i).skipValue(in);
+ return nextKind;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
deleted file mode 100644
index 7f6d439..0000000
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.SearchIterator;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-
-public class CollationController
-{
- private final ColumnFamilyStore cfs;
- private final QueryFilter filter;
- private final int gcBefore;
-
- private int sstablesIterated = 0;
-
- public CollationController(ColumnFamilyStore cfs, QueryFilter filter, int gcBefore)
- {
- this.cfs = cfs;
- this.filter = filter;
- this.gcBefore = gcBefore;
- }
-
- public ColumnFamily getTopLevelColumns(boolean copyOnHeap)
- {
- return filter.filter instanceof NamesQueryFilter
- && cfs.metadata.getDefaultValidator() != CounterColumnType.instance
- ? collectTimeOrderedData(copyOnHeap)
- : collectAllData(copyOnHeap);
- }
-
- /**
- * Collects data in order of recency, using the sstable maxtimestamp data.
- * Once we have data for all requests columns that is newer than the newest remaining maxtimestamp,
- * we stop.
- */
- private ColumnFamily collectTimeOrderedData(boolean copyOnHeap)
- {
- final ColumnFamily container = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
- List<OnDiskAtomIterator> iterators = new ArrayList<>();
- boolean isEmpty = true;
- Tracing.trace("Acquiring sstable references");
- ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
- DeletionInfo returnDeletionInfo = container.deletionInfo();
-
- try
- {
- Tracing.trace("Merging memtable contents");
- for (Memtable memtable : view.memtables)
- {
- ColumnFamily cf = memtable.getColumnFamily(filter.key);
- if (cf != null)
- {
- filter.delete(container.deletionInfo(), cf);
- isEmpty = false;
- Iterator<Cell> iter = filter.getIterator(cf);
- while (iter.hasNext())
- {
- Cell cell = iter.next();
- if (copyOnHeap)
- cell = cell.localCopy(cfs.metadata, HeapAllocator.instance);
- container.addColumn(cell);
- }
- }
- }
-
- // avoid changing the filter columns of the original filter
- // (reduceNameFilter removes columns that are known to be irrelevant)
- NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
- TreeSet<CellName> filterColumns = new TreeSet<>(namesFilter.columns);
- QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
-
- /* add the SSTables on disk */
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-
- // read sorted sstables
- for (SSTableReader sstable : view.sstables)
- {
- // if we've already seen a row tombstone with a timestamp greater
- // than the most recent update to this sstable, we're done, since the rest of the sstables
- // will also be older
- if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt)
- break;
-
- long currentMaxTs = sstable.getMaxTimestamp();
- reduceNameFilter(reducedFilter, container, currentMaxTs);
- if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
- break;
-
- Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
- sstable.incrementReadCount();
- OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
- iterators.add(iter);
- isEmpty = false;
- if (iter.getColumnFamily() != null)
- {
- container.delete(iter.getColumnFamily());
- sstablesIterated++;
- while (iter.hasNext())
- container.addAtom(iter.next());
- }
- }
-
- // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently)
- // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower)
- if (isEmpty)
- return null;
-
- // do a final collate. toCollate is boilerplate required to provide a CloseableIterator
- ColumnFamily returnCF = container.cloneMeShallow();
- Tracing.trace("Collating all results");
- filter.collateOnDiskAtom(returnCF, container.iterator(), gcBefore);
-
- // "hoist up" the requested data into a more recent sstable
- if (sstablesIterated > cfs.getMinimumCompactionThreshold()
- && !cfs.isAutoCompactionDisabled()
- && cfs.getCompactionStrategyManager().shouldDefragment())
- {
- // !!WARNING!! if we stop copying our data to a heap-managed object,
- // we will need to track the lifetime of this mutation as well
- Tracing.trace("Defragmenting requested data");
- final Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.getKey(), returnCF.cloneMe());
- StageManager.getStage(Stage.MUTATION).execute(new Runnable()
- {
- public void run()
- {
- // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
- Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
- }
- });
- }
-
- // Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly:
- return returnCF;
- }
- finally
- {
- for (OnDiskAtomIterator iter : iterators)
- FileUtils.closeQuietly(iter);
- }
- }
-
- /**
- * remove columns from @param filter where we already have data in @param container newer than @param sstableTimestamp
- */
- private void reduceNameFilter(QueryFilter filter, ColumnFamily container, long sstableTimestamp)
- {
- if (container == null)
- return;
-
- SearchIterator<CellName, Cell> searchIter = container.searchIterator();
- for (Iterator<CellName> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext() && searchIter.hasNext(); )
- {
- CellName filterColumn = iterator.next();
- Cell cell = searchIter.next(filterColumn);
- if (cell != null && cell.timestamp() > sstableTimestamp)
- iterator.remove();
- }
- }
-
- /**
- * Collects data the brute-force way: gets an iterator for the filter in question
- * from every memtable and sstable, then merges them together.
- */
- private ColumnFamily collectAllData(boolean copyOnHeap)
- {
- Tracing.trace("Acquiring sstable references");
- ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
- List<Iterator<? extends OnDiskAtom>> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
- ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
- DeletionInfo returnDeletionInfo = returnCF.deletionInfo();
- try
- {
- Tracing.trace("Merging memtable tombstones");
- for (Memtable memtable : view.memtables)
- {
- final ColumnFamily cf = memtable.getColumnFamily(filter.key);
- if (cf != null)
- {
- filter.delete(returnDeletionInfo, cf);
- Iterator<Cell> iter = filter.getIterator(cf);
- if (copyOnHeap)
- {
- iter = Iterators.transform(iter, new Function<Cell, Cell>()
- {
- public Cell apply(Cell cell)
- {
- return cell.localCopy(cf.metadata, HeapAllocator.instance);
- }
- });
- }
- iterators.add(iter);
- }
- }
-
- /*
- * We can't eliminate full sstables based on the timestamp of what we've already read like
- * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
- * we've read. We still rely on the sstable ordering by maxTimestamp since if
- * maxTimestamp_s1 > maxTimestamp_s0,
- * we're guaranteed that s1 cannot have a row tombstone such that
- * timestamp(tombstone) > maxTimestamp_s0
- * since we necessarily have
- * timestamp(tombstone) <= maxTimestamp_s1
- * In other words, iterating in maxTimestamp order allow to do our mostRecentTombstone elimination
- * in one pass, and minimize the number of sstables for which we read a rowTombstone.
- */
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
- List<SSTableReader> skippedSSTables = null;
- long minTimestamp = Long.MAX_VALUE;
- int nonIntersectingSSTables = 0;
-
- for (SSTableReader sstable : view.sstables)
- {
- minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
- // if we've already seen a row tombstone with a timestamp greater
- // than the most recent update to this sstable, we can skip it
- if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt)
- break;
-
- if (!filter.shouldInclude(sstable))
- {
- nonIntersectingSSTables++;
- // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
- if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
- {
- if (skippedSSTables == null)
- skippedSSTables = new ArrayList<>();
- skippedSSTables.add(sstable);
- }
- continue;
- }
-
- sstable.incrementReadCount();
- OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
- iterators.add(iter);
- if (iter.getColumnFamily() != null)
- {
- ColumnFamily cf = iter.getColumnFamily();
- returnCF.delete(cf);
- sstablesIterated++;
- }
- }
-
- int includedDueToTombstones = 0;
- // Check for row tombstone in the skipped sstables
- if (skippedSSTables != null)
- {
- for (SSTableReader sstable : skippedSSTables)
- {
- if (sstable.getMaxTimestamp() <= minTimestamp)
- continue;
-
- sstable.incrementReadCount();
- OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
- ColumnFamily cf = iter.getColumnFamily();
- // we are only interested in row-level tombstones here, and only if markedForDeleteAt is larger than minTimestamp
- if (cf != null && cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt > minTimestamp)
- {
- includedDueToTombstones++;
- iterators.add(iter);
- returnCF.delete(cf.deletionInfo().getTopLevelDeletion());
- sstablesIterated++;
- }
- else
- {
- FileUtils.closeQuietly(iter);
- }
- }
- }
-
- if (Tracing.isTracing())
- Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
- nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
-
- // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently)
- // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower)
- if (iterators.isEmpty())
- return null;
-
- Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
- filter.collateOnDiskAtom(returnCF, iterators, gcBefore);
-
- // Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly:
- return returnCF;
- }
- finally
- {
- for (Object iter : iterators)
- if (iter instanceof Closeable)
- FileUtils.closeQuietly((Closeable) iter);
- }
- }
-
- public int getSstablesIterated()
- {
- return sstablesIterated;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
deleted file mode 100644
index a7243a2..0000000
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import org.apache.cassandra.cache.IRowCacheEntry;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
-
-/**
- * A sorted map of columns.
- * This represents the backing map of a colum family.
- *
- * Whether the implementation is thread safe or not is left to the
- * implementing classes.
- */
-public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
-{
- /* The column serializer for this Column Family. Create based on config. */
- public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer();
-
- protected final CFMetaData metadata;
-
- protected ColumnFamily(CFMetaData metadata)
- {
- assert metadata != null;
- this.metadata = metadata;
- }
-
- public <T extends ColumnFamily> T cloneMeShallow(ColumnFamily.Factory<T> factory, boolean reversedInsertOrder)
- {
- T cf = factory.create(metadata, reversedInsertOrder);
- cf.delete(this);
- return cf;
- }
-
- public ColumnFamily cloneMeShallow()
- {
- return cloneMeShallow(false);
- }
-
- public ColumnFamily cloneMeShallow(boolean reversed)
- {
- return cloneMeShallow(getFactory(), reversed);
- }
-
- public ColumnFamilyType getType()
- {
- return metadata.cfType;
- }
-
- public int liveCQL3RowCount(long now)
- {
- ColumnCounter counter = getComparator().isDense()
- ? new ColumnCounter(now)
- : new ColumnCounter.GroupByPrefix(now, getComparator(), metadata.clusteringColumns().size());
- return counter.countAll(this).live();
- }
-
- /**
- * Clones the column map.
- */
- public abstract ColumnFamily cloneMe();
-
- public UUID id()
- {
- return metadata.cfId;
- }
-
- /**
- * @return The CFMetaData for this row
- */
- public CFMetaData metadata()
- {
- return metadata;
- }
-
- public void addColumn(CellName name, ByteBuffer value, long timestamp)
- {
- addColumn(name, value, timestamp, 0);
- }
-
- public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
- {
- assert !metadata().isCounter();
- Cell cell = AbstractCell.create(name, value, timestamp, timeToLive, metadata());
- addColumn(cell);
- }
-
- public void addCounter(CellName name, long value)
- {
- addColumn(new BufferCounterUpdateCell(name, value, FBUtilities.timestampMicros()));
- }
-
- public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
- {
- addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
- }
-
- public void addTombstone(CellName name, int localDeletionTime, long timestamp)
- {
- addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
- }
-
- public void addAtom(OnDiskAtom atom)
- {
- if (atom instanceof Cell)
- {
- addColumn((Cell)atom);
- }
- else
- {
- assert atom instanceof RangeTombstone;
- delete((RangeTombstone)atom);
- }
- }
-
- /**
- * Clear this column family, removing all columns and deletion info.
- */
- public abstract void clear();
-
- /**
- * Returns a {@link DeletionInfo.InOrderTester} for the deletionInfo() of
- * this column family. Please note that for ThreadSafe implementation of ColumnFamily,
- * this tester will remain valid even if new tombstones are added to this ColumnFamily
- * *as long as said addition is done in comparator order*. For AtomicSortedColumns,
- * the tester will correspond to the state of when this method is called.
- */
- public DeletionInfo.InOrderTester inOrderDeletionTester()
- {
- return deletionInfo().inOrderTester();
- }
-
- /**
- * Returns the factory used for this ISortedColumns implementation.
- */
- public abstract Factory getFactory();
-
- public abstract DeletionInfo deletionInfo();
- public abstract void setDeletionInfo(DeletionInfo info);
-
- public abstract void delete(DeletionInfo info);
- public abstract void delete(DeletionTime deletionTime);
- protected abstract void delete(RangeTombstone tombstone);
-
- public abstract SearchIterator<CellName, Cell> searchIterator();
-
- /**
- * Purges top-level and range tombstones whose localDeletionTime is older than gcBefore.
- * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
- */
- public abstract void purgeTombstones(int gcBefore);
-
- /**
- * Adds a cell to this cell map.
- * If a cell with the same name is already present in the map, it will
- * be replaced by the newly added cell.
- */
- public abstract void addColumn(Cell cell);
-
- /**
- * Adds a cell if it's non-gc-able and isn't shadowed by a partition/range tombstone with a higher timestamp.
- * Requires that the cell to add is sorted strictly after the last cell in the container.
- */
- public abstract void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore);
-
- /**
- * Appends a cell. Requires that the cell to add is sorted strictly after the last cell in the container.
- */
- public abstract void appendColumn(Cell cell);
-
- /**
- * Adds all the columns of a given column map to this column map.
- * This is equivalent to:
- * <code>
- * for (Cell c : cm)
- * addColumn(c, ...);
- * </code>
- * but is potentially faster.
- */
- public abstract void addAll(ColumnFamily cm);
-
- /**
- * Get a column given its name, returning null if the column is not
- * present.
- */
- public abstract Cell getColumn(CellName name);
-
- /**
- * Returns an iterable with the names of columns in this column map in the same order
- * as the underlying columns themselves.
- */
- public abstract Iterable<CellName> getColumnNames();
-
- /**
- * Returns the columns of this column map as a collection.
- * The columns in the returned collection should be sorted as the columns
- * in this map.
- */
- public abstract Collection<Cell> getSortedColumns();
-
- /**
- * Returns the columns of this column map as a collection.
- * The columns in the returned collection should be sorted in reverse
- * order of the columns in this map.
- */
- public abstract Collection<Cell> getReverseSortedColumns();
-
- /**
- * Returns the number of columns in this map.
- */
- public abstract int getColumnCount();
-
- /**
- * Returns whether or not there are any columns present.
- */
- public abstract boolean hasColumns();
-
- /**
- * Returns true if this contains no columns or deletion info
- */
- public boolean isEmpty()
- {
- return deletionInfo().isLive() && !hasColumns();
- }
-
- /**
- * Returns an iterator over the columns of this map that returns only the matching @param slices.
- * The provided slices must be in order and must be non-overlapping.
- */
- public abstract Iterator<Cell> iterator(ColumnSlice[] slices);
-
- /**
- * Returns a reversed iterator over the columns of this map that returns only the matching @param slices.
- * The provided slices must be in reversed order and must be non-overlapping.
- */
- public abstract Iterator<Cell> reverseIterator(ColumnSlice[] slices);
-
- /**
- * Returns if this map only support inserts in reverse order.
- */
- public abstract boolean isInsertReversed();
-
- /**
- * If `columns` has any tombstones (top-level or range tombstones), they will be applied to this set of columns.
- */
- public void delete(ColumnFamily columns)
- {
- delete(columns.deletionInfo());
- }
-
- /*
- * This function will calculate the difference between 2 column families.
- * The external input is assumed to be a superset of internal.
- */
- public ColumnFamily diff(ColumnFamily cfComposite)
- {
- assert cfComposite.id().equals(id());
- ColumnFamily cfDiff = ArrayBackedSortedColumns.factory.create(metadata);
- cfDiff.delete(cfComposite.deletionInfo());
-
- // (don't need to worry about cfNew containing Columns that are shadowed by
- // the delete tombstone, since cfNew was generated by CF.resolve, which
- // takes care of those for us.)
- for (Cell cellExternal : cfComposite)
- {
- CellName cName = cellExternal.name();
- Cell cellInternal = getColumn(cName);
- if (cellInternal == null)
- {
- cfDiff.addColumn(cellExternal);
- }
- else
- {
- Cell cellDiff = cellInternal.diff(cellExternal);
- if (cellDiff != null)
- {
- cfDiff.addColumn(cellDiff);
- }
- }
- }
-
- cfDiff.setDeletionInfo(deletionInfo().diff(cfComposite.deletionInfo()));
-
- if (!cfDiff.isEmpty())
- return cfDiff;
-
- return null;
- }
-
- public long dataSize()
- {
- long size = 0;
- for (Cell cell : this)
- size += cell.cellDataSize();
- return size;
- }
-
- public long maxTimestamp()
- {
- long maxTimestamp = deletionInfo().maxTimestamp();
- for (Cell cell : this)
- maxTimestamp = Math.max(maxTimestamp, cell.timestamp());
- return maxTimestamp;
- }
-
- @Override
- public int hashCode()
- {
- HashCodeBuilder builder = new HashCodeBuilder(373, 75437)
- .append(metadata)
- .append(deletionInfo());
- for (Cell cell : this)
- builder.append(cell);
- return builder.toHashCode();
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- return true;
- if (o == null || !(o instanceof ColumnFamily))
- return false;
-
- ColumnFamily comparison = (ColumnFamily) o;
-
- return metadata.equals(comparison.metadata)
- && deletionInfo().equals(comparison.deletionInfo())
- && ByteBufferUtil.compareUnsigned(digest(this), digest(comparison)) == 0;
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder("ColumnFamily(");
- sb.append(metadata.cfName);
-
- if (isMarkedForDelete())
- sb.append(" -").append(deletionInfo()).append("-");
-
- sb.append(" [").append(CellNames.getColumnsString(getComparator(), this)).append("])");
- return sb.toString();
- }
-
- public static ByteBuffer digest(ColumnFamily cf)
- {
- MessageDigest digest = FBUtilities.threadLocalMD5Digest();
- if (cf != null)
- cf.updateDigest(digest);
- return ByteBuffer.wrap(digest.digest());
- }
-
- public void updateDigest(MessageDigest digest)
- {
- for (Cell cell : this)
- cell.updateDigest(digest);
-
- deletionInfo().updateDigest(digest);
- }
-
- public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
- {
- if (cf1 == null)
- return cf2;
- return cf1.diff(cf2);
- }
-
- public ColumnStats getColumnStats()
- {
- // note that we default to MIN_VALUE/MAX_VALUE here to be able to override them later in this method
- // we are checking row/range tombstones and actual cells - there should always be data that overrides
- // these with actual values
- ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
- ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
- StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
- ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
- List<ByteBuffer> minColumnNamesSeen = Collections.emptyList();
- List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList();
- boolean hasLegacyCounterShards = false;
-
- if (deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
- {
- tombstones.update(deletionInfo().getTopLevelDeletion().localDeletionTime);
- maxDeletionTimeTracker.update(deletionInfo().getTopLevelDeletion().localDeletionTime);
- minTimestampTracker.update(deletionInfo().getTopLevelDeletion().markedForDeleteAt);
- maxTimestampTracker.update(deletionInfo().getTopLevelDeletion().markedForDeleteAt);
- }
- Iterator<RangeTombstone> it = deletionInfo().rangeIterator();
- while (it.hasNext())
- {
- RangeTombstone rangeTombstone = it.next();
- tombstones.update(rangeTombstone.getLocalDeletionTime());
- minTimestampTracker.update(rangeTombstone.timestamp());
- maxTimestampTracker.update(rangeTombstone.timestamp());
- maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
- minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, rangeTombstone.min, metadata.comparator);
- maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, rangeTombstone.max, metadata.comparator);
- }
-
- for (Cell cell : this)
- {
- minTimestampTracker.update(cell.timestamp());
- maxTimestampTracker.update(cell.timestamp());
- maxDeletionTimeTracker.update(cell.getLocalDeletionTime());
-
- int deletionTime = cell.getLocalDeletionTime();
- if (deletionTime < Integer.MAX_VALUE)
- tombstones.update(deletionTime);
- minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name(), metadata.comparator);
- maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name(), metadata.comparator);
- if (cell instanceof CounterCell)
- hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards();
- }
- return new ColumnStats(getColumnCount(),
- minTimestampTracker.get(),
- maxTimestampTracker.get(),
- maxDeletionTimeTracker.get(),
- tombstones,
- minColumnNamesSeen,
- maxColumnNamesSeen,
- hasLegacyCounterShards);
- }
-
- public boolean isMarkedForDelete()
- {
- return !deletionInfo().isLive();
- }
-
- /**
- * @return the comparator whose sorting order the contained columns conform to
- */
- public CellNameType getComparator()
- {
- return metadata.comparator;
- }
-
- public boolean hasOnlyTombstones(long now)
- {
- for (Cell cell : this)
- if (cell.isLive(now))
- return false;
- return true;
- }
-
- public Iterator<Cell> iterator()
- {
- return getSortedColumns().iterator();
- }
-
- public Iterator<Cell> reverseIterator()
- {
- return getReverseSortedColumns().iterator();
- }
-
- public Map<CellName, ByteBuffer> asMap()
- {
- ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
- for (Cell cell : this)
- builder.put(cell.name(), cell.value());
- return builder.build();
- }
-
- public static ColumnFamily fromBytes(ByteBuffer bytes)
- {
- if (bytes == null)
- return null;
-
- try
- {
- return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)),
- ArrayBackedSortedColumns.factory,
- ColumnSerializer.Flag.LOCAL,
- MessagingService.current_version);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public ByteBuffer toBytes()
- {
- try (DataOutputBuffer out = new DataOutputBuffer())
- {
- serializer.serialize(this, out, MessagingService.current_version);
- return ByteBuffer.wrap(out.getData(), 0, out.getLength());
- }
- }
-
-
- /**
- * @return an iterator where the removes are carried out once everything has been iterated
- */
- public abstract BatchRemoveIterator<Cell> batchRemoveIterator();
-
- public abstract static class Factory <T extends ColumnFamily>
- {
- /**
- * Returns a (initially empty) column map whose columns are sorted
- * according to the provided comparator.
- * The {@code insertReversed} flag is an hint on how we expect insertion to be perfomed,
- * either in sorted or reverse sorted order. This is used by ArrayBackedSortedColumns to
- * allow optimizing for both forward and reversed slices. This does not matter for ThreadSafeSortedColumns.
- * Note that this is only an hint on how we expect to do insertion, this does not change the map sorting.
- */
- public abstract T create(CFMetaData metadata, boolean insertReversed, int initialCapacity);
-
- public T create(CFMetaData metadata, boolean insertReversed)
- {
- return create(metadata, insertReversed, 0);
- }
-
- public T create(CFMetaData metadata)
- {
- return create(metadata, false);
- }
-
- public T create(String keyspace, String cfName)
- {
- return create(Schema.instance.getCFMetaData(keyspace, cfName));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
deleted file mode 100644
index 928c21f..0000000
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.io.ISSTableSerializer;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily>, ISSTableSerializer<ColumnFamily>
-{
- /*
- * Serialized ColumnFamily format:
- *
- * [serialized for intra-node writes only, e.g. returning a query result]
- * <cf nullability boolean: false if the cf is null>
- * <cf id>
- *
- * [in sstable only]
- * <column bloom filter>
- * <sparse column index, start/finish columns every ColumnIndexSizeInKB of data>
- *
- * [always present]
- * <local deletion time>
- * <client-provided deletion time>
- * <column count>
- * <columns, serialized individually>
- */
- public void serialize(ColumnFamily cf, DataOutputPlus out, int version)
- {
- try
- {
- if (cf == null)
- {
- out.writeBoolean(false);
- return;
- }
-
- out.writeBoolean(true);
- serializeCfId(cf.id(), out, version);
- cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version);
- ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
- int count = cf.getColumnCount();
- out.writeInt(count);
- int written = 0;
- for (Cell cell : cf)
- {
- columnSerializer.serialize(cell, out);
- written++;
- }
- assert count == written: "Table had " + count + " columns, but " + written + " written";
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public ColumnFamily deserialize(DataInput in, int version) throws IOException
- {
- return deserialize(in, ColumnSerializer.Flag.LOCAL, version);
- }
-
- public ColumnFamily deserialize(DataInput in, ColumnSerializer.Flag flag, int version) throws IOException
- {
- return deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
- }
-
- public ColumnFamily deserialize(DataInput in, ColumnFamily.Factory factory, ColumnSerializer.Flag flag, int version) throws IOException
- {
- if (!in.readBoolean())
- return null;
-
- ColumnFamily cf = factory.create(Schema.instance.getCFMetaData(deserializeCfId(in, version)));
-
- if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
- {
- SuperColumns.deserializerSuperColumnFamily(in, cf, flag, version);
- }
- else
- {
- cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version));
-
- ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
- int size = in.readInt();
- for (int i = 0; i < size; ++i)
- cf.addColumn(columnSerializer.deserialize(in, flag));
- }
- return cf;
- }
-
- public long contentSerializedSize(ColumnFamily cf, TypeSizes typeSizes, int version)
- {
- long size = cf.getComparator().deletionInfoSerializer().serializedSize(cf.deletionInfo(), typeSizes, version);
- size += typeSizes.sizeof(cf.getColumnCount());
- ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
- for (Cell cell : cf)
- size += columnSerializer.serializedSize(cell, typeSizes);
- return size;
- }
-
- public long serializedSize(ColumnFamily cf, TypeSizes typeSizes, int version)
- {
- if (cf == null)
- {
- return typeSizes.sizeof(false);
- }
- else
- {
- return typeSizes.sizeof(true) /* nullness bool */
- + cfIdSerializedSize(cf.id(), typeSizes, version) /* id */
- + contentSerializedSize(cf, typeSizes, version);
- }
- }
-
- public long serializedSize(ColumnFamily cf, int version)
- {
- return serializedSize(cf, TypeSizes.NATIVE, version);
- }
-
- public void serializeForSSTable(ColumnFamily cf, DataOutputPlus out)
- {
- // Column families shouldn't be written directly to disk, use ColumnIndex.Builder instead
- throw new UnsupportedOperationException();
- }
-
- public ColumnFamily deserializeFromSSTable(DataInput in, Version version)
- {
- throw new UnsupportedOperationException();
- }
-
- public void serializeCfId(UUID cfId, DataOutputPlus out, int version) throws IOException
- {
- UUIDSerializer.serializer.serialize(cfId, out, version);
- }
-
- public UUID deserializeCfId(DataInput in, int version) throws IOException
- {
- UUID cfId = UUIDSerializer.serializer.deserialize(in, version);
- if (Schema.instance.getCF(cfId) == null)
- throw new UnknownColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
-
- return cfId;
- }
-
- public int cfIdSerializedSize(UUID cfId, TypeSizes typeSizes, int version)
- {
- return typeSizes.sizeof(cfId);
- }
-}