You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:53:13 UTC
[5/6] Rename Column to Cell
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index c2134c2..2ea60f1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -54,7 +54,7 @@ import org.apache.cassandra.utils.*;
* Whether the implementation is thread safe or not is left to the
* implementing classes.
*/
-public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
+public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
{
/* The column serializer for this Column Family. Create based on config. */
public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer();
@@ -102,20 +102,20 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
return metadata;
}
- public void addIfRelevant(Column column, DeletionInfo.InOrderTester tester, int gcBefore)
+ public void addIfRelevant(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore)
{
- // the column itself must be not gc-able (it is live, or a still relevant tombstone), (1)
- // and if its container is deleted, the column must be changed more recently than the container tombstone (2)
- if ((column.getLocalDeletionTime() >= gcBefore) // (1)
- && (!tester.isDeleted(column.name(), column.timestamp()))) // (2)
+ // the cell itself must be not gc-able (it is live, or a still relevant tombstone), (1)
+ // and if its container is deleted, the cell must be changed more recently than the container tombstone (2)
+ if ((cell.getLocalDeletionTime() >= gcBefore) // (1)
+ && (!tester.isDeleted(cell.name(), cell.timestamp()))) // (2)
{
- addColumn(column);
+ addColumn(cell);
}
}
- public void addColumn(Column column)
+ public void addColumn(Cell cell)
{
- addColumn(column, HeapAllocator.instance);
+ addColumn(cell, HeapAllocator.instance);
}
public void addColumn(CellName name, ByteBuffer value, long timestamp)
@@ -126,30 +126,30 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
{
assert !metadata().getDefaultValidator().isCommutative();
- Column column = Column.create(name, value, timestamp, timeToLive, metadata());
- addColumn(column);
+ Cell cell = Cell.create(name, value, timestamp, timeToLive, metadata());
+ addColumn(cell);
}
public void addCounter(CellName name, long value)
{
- addColumn(new CounterUpdateColumn(name, value, System.currentTimeMillis()));
+ addColumn(new CounterUpdateCell(name, value, System.currentTimeMillis()));
}
public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
{
- addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
+ addColumn(new DeletedCell(name, localDeletionTime, timestamp));
}
public void addTombstone(CellName name, int localDeletionTime, long timestamp)
{
- addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
+ addColumn(new DeletedCell(name, localDeletionTime, timestamp));
}
public void addAtom(OnDiskAtom atom)
{
- if (atom instanceof Column)
+ if (atom instanceof Cell)
{
- addColumn((Column)atom);
+ addColumn((Cell)atom);
}
else
{
@@ -194,35 +194,35 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public abstract void purgeTombstones(int gcBefore);
/**
- * Adds a column to this column map.
- * If a column with the same name is already present in the map, it will
- * be replaced by the newly added column.
+ * Adds a cell to this cell map.
+ * If a cell with the same name is already present in the map, it will
+ * be replaced by the newly added cell.
*/
- public abstract void addColumn(Column column, Allocator allocator);
+ public abstract void addColumn(Cell cell, Allocator allocator);
/**
* Adds all the columns of a given column map to this column map.
* This is equivalent to:
* <code>
- * for (Column c : cm)
+ * for (Cell c : cm)
* addColumn(c, ...);
* </code>
* but is potentially faster.
*/
- public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
+ public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation);
/**
- * Replace oldColumn if present by newColumn.
- * Returns true if oldColumn was present and thus replaced.
- * oldColumn and newColumn should have the same name.
+ * Replace oldCell if present by newCell.
+ * Returns true if oldCell was present and thus replaced.
+ * oldCell and newCell should have the same name.
*/
- public abstract boolean replace(Column oldColumn, Column newColumn);
+ public abstract boolean replace(Cell oldCell, Cell newCell);
/**
* Get a column given its name, returning null if the column is not
* present.
*/
- public abstract Column getColumn(CellName name);
+ public abstract Cell getColumn(CellName name);
/**
* Returns an iterable with the names of columns in this column map in the same order
@@ -235,14 +235,14 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
* The columns in the returned collection should be sorted as the columns
* in this map.
*/
- public abstract Collection<Column> getSortedColumns();
+ public abstract Collection<Cell> getSortedColumns();
/**
* Returns the columns of this column map as a collection.
* The columns in the returned collection should be sorted in reverse
* order of the columns in this map.
*/
- public abstract Collection<Column> getReverseSortedColumns();
+ public abstract Collection<Cell> getReverseSortedColumns();
/**
* Returns the number of columns in this map.
@@ -261,13 +261,13 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
* Returns an iterator over the columns of this map that returns only the matching @param slices.
* The provided slices must be in order and must be non-overlapping.
*/
- public abstract Iterator<Column> iterator(ColumnSlice[] slices);
+ public abstract Iterator<Cell> iterator(ColumnSlice[] slices);
/**
* Returns a reversed iterator over the columns of this map that returns only the matching @param slices.
* The provided slices must be in reversed order and must be non-overlapping.
*/
- public abstract Iterator<Column> reverseIterator(ColumnSlice[] slices);
+ public abstract Iterator<Cell> reverseIterator(ColumnSlice[] slices);
/**
* Returns if this map only support inserts in reverse order.
@@ -284,7 +284,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public void addAll(ColumnFamily cf, Allocator allocator)
{
- addAll(cf, allocator, Functions.<Column>identity());
+ addAll(cf, allocator, Functions.<Cell>identity());
}
/*
@@ -300,20 +300,20 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
// (don't need to worry about cfNew containing Columns that are shadowed by
// the delete tombstone, since cfNew was generated by CF.resolve, which
// takes care of those for us.)
- for (Column columnExternal : cfComposite)
+ for (Cell cellExternal : cfComposite)
{
- CellName cName = columnExternal.name();
- Column columnInternal = getColumn(cName);
- if (columnInternal == null)
+ CellName cName = cellExternal.name();
+ Cell cellInternal = getColumn(cName);
+ if (cellInternal == null)
{
- cfDiff.addColumn(columnExternal);
+ cfDiff.addColumn(cellExternal);
}
else
{
- Column columnDiff = columnInternal.diff(columnExternal);
- if (columnDiff != null)
+ Cell cellDiff = cellInternal.diff(cellExternal);
+ if (cellDiff != null)
{
- cfDiff.addColumn(columnDiff);
+ cfDiff.addColumn(cellDiff);
}
}
}
@@ -326,16 +326,16 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public long dataSize()
{
long size = 0;
- for (Column column : this)
- size += column.dataSize();
+ for (Cell cell : this)
+ size += cell.dataSize();
return size;
}
public long maxTimestamp()
{
long maxTimestamp = deletionInfo().maxTimestamp();
- for (Column column : this)
- maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
+ for (Cell cell : this)
+ maxTimestamp = Math.max(maxTimestamp, cell.maxTimestamp());
return maxTimestamp;
}
@@ -345,8 +345,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
HashCodeBuilder builder = new HashCodeBuilder(373, 75437)
.append(metadata)
.append(deletionInfo());
- for (Column column : this)
- builder.append(column);
+ for (Cell cell : this)
+ builder.append(cell);
return builder.toHashCode();
}
@@ -388,8 +388,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public void updateDigest(MessageDigest digest)
{
- for (Column column : this)
- column.updateDigest(digest);
+ for (Cell cell : this)
+ cell.updateDigest(digest);
}
public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
@@ -420,16 +420,16 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
int maxLocalDeletionTime = Integer.MIN_VALUE;
List<ByteBuffer> minColumnNamesSeen = Collections.emptyList();
List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList();
- for (Column column : this)
+ for (Cell cell : this)
{
- minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp());
- maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp());
- maxLocalDeletionTime = Math.max(maxLocalDeletionTime, column.getLocalDeletionTime());
- int deletionTime = column.getLocalDeletionTime();
+ minTimestampSeen = Math.min(minTimestampSeen, cell.minTimestamp());
+ maxTimestampSeen = Math.max(maxTimestampSeen, cell.maxTimestamp());
+ maxLocalDeletionTime = Math.max(maxLocalDeletionTime, cell.getLocalDeletionTime());
+ int deletionTime = cell.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
tombstones.update(deletionTime);
- minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, column.name, metadata.comparator);
- maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, column.name, metadata.comparator);
+ minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name, metadata.comparator);
+ maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name, metadata.comparator);
}
return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime, tombstones, minColumnNamesSeen, maxColumnNamesSeen);
}
@@ -449,18 +449,18 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public boolean hasOnlyTombstones(long now)
{
- for (Column column : this)
- if (column.isLive(now))
+ for (Cell cell : this)
+ if (cell.isLive(now))
return false;
return true;
}
- public Iterator<Column> iterator()
+ public Iterator<Cell> iterator()
{
return getSortedColumns().iterator();
}
- public Iterator<Column> reverseIterator()
+ public Iterator<Cell> reverseIterator()
{
return getReverseSortedColumns().iterator();
}
@@ -473,8 +473,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
// Do we have colums that are either deleted by the container or gcable tombstone?
DeletionInfo.InOrderTester tester = inOrderDeletionTester();
- for (Column column : this)
- if (tester.isDeleted(column) || column.hasIrrelevantData(gcBefore))
+ for (Cell cell : this)
+ if (tester.isDeleted(cell) || cell.hasIrrelevantData(gcBefore))
return true;
return false;
@@ -483,8 +483,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public Map<CellName, ByteBuffer> asMap()
{
ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
- for (Column column : this)
- builder.put(column.name, column.value);
+ for (Cell cell : this)
+ builder.put(cell.name, cell.value);
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 13ec6fc..92aa955 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -72,9 +72,9 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
int count = cf.getColumnCount();
out.writeInt(count);
int written = 0;
- for (Column column : cf)
+ for (Cell cell : cf)
{
- columnSerializer.serialize(column, out);
+ columnSerializer.serialize(cell, out);
written++;
}
assert count == written: "Column family had " + count + " columns, but " + written + " written";
@@ -131,8 +131,8 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
size += cf.getComparator().deletionInfoSerializer().serializedSize(cf.deletionInfo(), typeSizes, version);
size += typeSizes.sizeof(cf.getColumnCount());
ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
- for (Column column : cf)
- size += columnSerializer.serializedSize(column, typeSizes);
+ for (Cell cell : cf)
+ size += columnSerializer.serializedSize(cell, typeSizes);
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f00e281..80f77bf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -903,12 +903,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
{
- Iterator<Column> iter = cf.iterator();
+ Iterator<Cell> iter = cf.iterator();
DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
while (iter.hasNext())
{
- Column c = iter.next();
+ Cell c = iter.next();
// remove columns if
// (a) the column itself is gcable or
// (b) the column is shadowed by a CF tombstone
@@ -926,7 +926,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// returns true if
// 1. this column has been dropped from schema and
// 2. if it has been re-added since then, this particular column was inserted before the last drop
- private static boolean isDroppedColumn(Column c, CFMetaData meta)
+ private static boolean isDroppedColumn(Cell c, CFMetaData meta)
{
Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName());
return droppedAt != null && c.timestamp() <= droppedAt;
@@ -937,7 +937,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
return;
- Iterator<Column> iter = cf.iterator();
+ Iterator<Cell> iter = cf.iterator();
while (iter.hasNext())
if (isDroppedColumn(iter.next(), metadata))
iter.remove();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 3e6d55d..f00f958 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -126,7 +126,7 @@ public class ColumnIndex
Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
- for (Column c : cf)
+ for (Cell c : cf)
{
while (tombstone != null && comparator.compare(c.name(), tombstone.min) >= 0)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index a1c6ebd..3cfd900 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class ColumnSerializer implements ISerializer<Column>
+public class ColumnSerializer implements ISerializer<Cell>
{
public final static int DELETION_MASK = 0x01;
public final static int EXPIRATION_MASK = 0x02;
@@ -61,24 +61,24 @@ public class ColumnSerializer implements ISerializer<Column>
this.type = type;
}
- public void serialize(Column column, DataOutput out) throws IOException
+ public void serialize(Cell cell, DataOutput out) throws IOException
{
- assert !column.name().isEmpty();
- type.cellSerializer().serialize(column.name(), out);
+ assert !cell.name().isEmpty();
+ type.cellSerializer().serialize(cell.name(), out);
try
{
- out.writeByte(column.serializationFlags());
- if (column instanceof CounterColumn)
+ out.writeByte(cell.serializationFlags());
+ if (cell instanceof CounterCell)
{
- out.writeLong(((CounterColumn)column).timestampOfLastDelete());
+ out.writeLong(((CounterCell) cell).timestampOfLastDelete());
}
- else if (column instanceof ExpiringColumn)
+ else if (cell instanceof ExpiringCell)
{
- out.writeInt(((ExpiringColumn) column).getTimeToLive());
- out.writeInt(column.getLocalDeletionTime());
+ out.writeInt(((ExpiringCell) cell).getTimeToLive());
+ out.writeInt(cell.getLocalDeletionTime());
}
- out.writeLong(column.timestamp());
- ByteBufferUtil.writeWithLength(column.value(), out);
+ out.writeLong(cell.timestamp());
+ ByteBufferUtil.writeWithLength(cell.value(), out);
}
catch (IOException e)
{
@@ -86,7 +86,7 @@ public class ColumnSerializer implements ISerializer<Column>
}
}
- public Column deserialize(DataInput in) throws IOException
+ public Cell deserialize(DataInput in) throws IOException
{
return deserialize(in, Flag.LOCAL);
}
@@ -96,12 +96,12 @@ public class ColumnSerializer implements ISerializer<Column>
* deserialize comes from a remote host. If it does, then we must clear
* the delta.
*/
- public Column deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
+ public Cell deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
{
return deserialize(in, flag, Integer.MIN_VALUE);
}
- public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
+ public Cell deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
{
CellName name = type.cellSerializer().deserialize(in);
@@ -109,14 +109,14 @@ public class ColumnSerializer implements ISerializer<Column>
return deserializeColumnBody(in, name, b, flag, expireBefore);
}
- Column deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
+ Cell deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
{
if ((mask & COUNTER_MASK) != 0)
{
long timestampOfLastDelete = in.readLong();
long ts = in.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(in);
- return CounterColumn.create(name, value, ts, timestampOfLastDelete, flag);
+ return CounterCell.create(name, value, ts, timestampOfLastDelete, flag);
}
else if ((mask & EXPIRATION_MASK) != 0)
{
@@ -124,17 +124,17 @@ public class ColumnSerializer implements ISerializer<Column>
int expiration = in.readInt();
long ts = in.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(in);
- return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore, flag);
+ return ExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
}
else
{
long ts = in.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(in);
return (mask & COUNTER_UPDATE_MASK) != 0
- ? new CounterUpdateColumn(name, value, ts)
+ ? new CounterUpdateCell(name, value, ts)
: ((mask & DELETION_MASK) == 0
- ? new Column(name, value, ts)
- : new DeletedColumn(name, value, ts));
+ ? new Cell(name, value, ts)
+ : new DeletedCell(name, value, ts));
}
}
@@ -151,9 +151,9 @@ public class ColumnSerializer implements ISerializer<Column>
FileUtils.skipBytesFully(in, length);
}
- public long serializedSize(Column column, TypeSizes typeSizes)
+ public long serializedSize(Cell cell, TypeSizes typeSizes)
{
- return column.serializedSize(type, typeSizes);
+ return cell.serializedSize(type, typeSizes);
}
public static class CorruptColumnException extends IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
new file mode 100644
index 0000000..0391eb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.net.InetAddress;
+import java.security.MessageDigest;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.serializers.MarshalException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.context.IContext.ContextRelationship;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.*;
+
+/**
+ * A column that represents a partitioned counter.
+ */
+public class CounterCell extends Cell
+{
+ private static final Logger logger = LoggerFactory.getLogger(CounterCell.class);
+
+ protected static final CounterContext contextManager = CounterContext.instance();
+
+ private final long timestampOfLastDelete;
+
+ public CounterCell(CellName name, long value, long timestamp)
+ {
+ this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
+ }
+
+ public CounterCell(CellName name, long value, long timestamp, long timestampOfLastDelete)
+ {
+ this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
+ }
+
+ public CounterCell(CellName name, ByteBuffer value, long timestamp)
+ {
+ this(name, value, timestamp, Long.MIN_VALUE);
+ }
+
+ public CounterCell(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
+ {
+ super(name, value, timestamp);
+ this.timestampOfLastDelete = timestampOfLastDelete;
+ }
+
+ public static CounterCell create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
+ {
+ // #elt being negative means we have to clean delta
+ short count = value.getShort(value.position());
+ if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && count < 0))
+ value = CounterContext.instance().clearAllDelta(value);
+ return new CounterCell(name, value, timestamp, timestampOfLastDelete);
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ return new CounterCell(newName, value, timestamp, timestampOfLastDelete);
+ }
+
+ public long timestampOfLastDelete()
+ {
+ return timestampOfLastDelete;
+ }
+
+ public long total()
+ {
+ return contextManager.total(value);
+ }
+
+ @Override
+ public int dataSize()
+ {
+ /*
+ * A counter column adds to a Cell :
+ * + 8 bytes for timestampOfLastDelete
+ */
+ return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
+ }
+
+ @Override
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
+ {
+ return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
+ }
+
+ @Override
+ public Cell diff(Cell cell)
+ {
+ assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+
+ if (timestamp() < cell.timestamp())
+ return cell;
+
+ // Note that if at that point, cell can't be a tombstone. Indeed,
+ // cell is the result of merging us with other nodes results, and
+ // merging a CounterCell with a tombstone never return a tombstone
+ // unless that tombstone timestamp is greater that the CounterCell
+ // one.
+ assert !(cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+
+ if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete())
+ return cell;
+ ContextRelationship rel = contextManager.diff(cell.value(), value());
+ if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
+ return cell;
+ return null;
+ }
+
+ /*
+ * We have to special case digest creation for counter column because
+ * we don't want to include the information about which shard of the
+ * context is a delta or not, since this information differs from node to
+ * node.
+ */
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ digest.update(name.toByteBuffer().duplicate());
+ // We don't take the deltas into account in a digest
+ contextManager.updateDigest(digest, value);
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try
+ {
+ buffer.writeLong(timestamp);
+ buffer.writeByte(serializationFlags());
+ buffer.writeLong(timestampOfLastDelete);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ digest.update(buffer.getData(), 0, buffer.getLength());
+ }
+
+ @Override
+ public Cell reconcile(Cell cell, Allocator allocator)
+ {
+ assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+
+ // live + tombstone: track last tombstone
+ if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
+ {
+ // live < tombstone
+ if (timestamp() < cell.timestamp())
+ {
+ return cell;
+ }
+ // live last delete >= tombstone
+ if (timestampOfLastDelete() >= cell.timestamp())
+ {
+ return this;
+ }
+ // live last delete < tombstone
+ return new CounterCell(name(), value(), timestamp(), cell.timestamp());
+ }
+ // live < live last delete
+ if (timestamp() < ((CounterCell) cell).timestampOfLastDelete())
+ return cell;
+ // live last delete > live
+ if (timestampOfLastDelete() > cell.timestamp())
+ return this;
+ // live + live: merge clocks; update value
+ return new CounterCell(
+ name(),
+ contextManager.merge(value(), cell.value(), allocator),
+ Math.max(timestamp(), cell.timestamp()),
+ Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ // super.equals() returns false if o is not a CounterCell
+ return super.equals(o) && timestampOfLastDelete == ((CounterCell)o).timestampOfLastDelete;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = super.hashCode();
+ result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
+ return result;
+ }
+
+ @Override
+ public Cell localCopy(ColumnFamilyStore cfs)
+ {
+ return localCopy(cfs, HeapAllocator.instance);
+ }
+
+ @Override
+ public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ {
+ return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
+ }
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(comparator.getString(name));
+ sb.append(":");
+ sb.append(false);
+ sb.append(":");
+ sb.append(contextManager.toString(value));
+ sb.append("@");
+ sb.append(timestamp());
+ sb.append("!");
+ sb.append(timestampOfLastDelete);
+ return sb.toString();
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.COUNTER_MASK;
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ validateName(metadata);
+ // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
+ // which is not the internal representation of counters
+ contextManager.validateContext(value());
+ }
+
+ /**
+ * Check if a given counterId is found in this CounterCell context.
+ */
+ public boolean hasCounterId(CounterId id)
+ {
+ return contextManager.hasCounterId(value(), id);
+ }
+
+ private CounterCell computeOldShardMerger(int mergeBefore)
+ {
+ ByteBuffer bb = contextManager.computeOldShardMerger(value(), CounterId.getOldLocalCounterIds(), mergeBefore);
+ if (bb == null)
+ return null;
+ else
+ return new CounterCell(name(), bb, timestamp(), timestampOfLastDelete);
+ }
+
+ private CounterCell removeOldShards(int gcBefore)
+ {
+ ByteBuffer bb = contextManager.removeOldShards(value(), gcBefore);
+ if (bb == value())
+ return this;
+ else
+ {
+ return new CounterCell(name(), bb, timestamp(), timestampOfLastDelete);
+ }
+ }
+
+ public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
+ {
+ mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
+ }
+
+ /**
+ * There is two phase to the removal of old shards.
+ * First phase: we merge the old shard value to the current shard and
+ * 'nulify' the old one. We then send the counter context with the old
+ * shard nulified to all other replica.
+ * Second phase: once an old shard has been nulified for longer than
+ * gc_grace (to be sure all other replica had been aware of the merge), we
+ * simply remove that old shard from the context (it's value is 0).
+ * This method does both phases.
+ * (Note that the sendToOtherReplica flag is here only to facilitate
+ * testing. It should be true in real code so use the method above
+ * preferably)
+ */
+ public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
+ {
+ ColumnFamily remoteMerger = null;
+
+ for (Cell c : cf)
+ {
+ if (!(c instanceof CounterCell))
+ continue;
+ CounterCell cc = (CounterCell) c;
+ CounterCell shardMerger = cc.computeOldShardMerger(mergeBefore);
+ CounterCell merged = cc;
+ if (shardMerger != null)
+ {
+ merged = (CounterCell) cc.reconcile(shardMerger);
+ if (remoteMerger == null)
+ remoteMerger = cf.cloneMeShallow();
+ remoteMerger.addColumn(merged);
+ }
+ CounterCell cleaned = merged.removeOldShards(gcBefore);
+ if (cleaned != cc)
+ {
+ cf.replace(cc, cleaned);
+ }
+ }
+
+ if (remoteMerger != null && sendToOtherReplica)
+ {
+ try
+ {
+ sendToOtherReplica(key, remoteMerger);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error while sending shard merger mutation to remote endpoints", e);
+ }
+ }
+ }
+
+ public Cell markDeltaToBeCleared()
+ {
+ return new CounterCell(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
+ }
+
+ private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
+ {
+ RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
+
+ final InetAddress local = FBUtilities.getBroadcastAddress();
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
+
+ StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
+ {
+ public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
+ throws OverloadedException
+ {
+ // We should only send to the remote replica, not the local one
+ Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
+ // Fake local response to be a good lad but we won't wait on the responseHandler
+ responseHandler.response(null);
+ StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
+ }
+ }, null, WriteType.SIMPLE);
+
+ // we don't wait for answers
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
deleted file mode 100644
index ac2c88e..0000000
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.net.InetAddress;
-import java.security.MessageDigest;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.serializers.MarshalException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.context.IContext.ContextRelationship;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.utils.*;
-
-/**
- * A column that represents a partitioned counter.
- */
-public class CounterColumn extends Column
-{
- private static final Logger logger = LoggerFactory.getLogger(CounterColumn.class);
-
- protected static final CounterContext contextManager = CounterContext.instance();
-
- private final long timestampOfLastDelete;
-
- public CounterColumn(CellName name, long value, long timestamp)
- {
- this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
- }
-
- public CounterColumn(CellName name, long value, long timestamp, long timestampOfLastDelete)
- {
- this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
- }
-
- public CounterColumn(CellName name, ByteBuffer value, long timestamp)
- {
- this(name, value, timestamp, Long.MIN_VALUE);
- }
-
- public CounterColumn(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
- {
- super(name, value, timestamp);
- this.timestampOfLastDelete = timestampOfLastDelete;
- }
-
- public static CounterColumn create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
- {
- // #elt being negative means we have to clean delta
- short count = value.getShort(value.position());
- if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && count < 0))
- value = CounterContext.instance().clearAllDelta(value);
- return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
- }
-
- @Override
- public Column withUpdatedName(CellName newName)
- {
- return new CounterColumn(newName, value, timestamp, timestampOfLastDelete);
- }
-
- public long timestampOfLastDelete()
- {
- return timestampOfLastDelete;
- }
-
- public long total()
- {
- return contextManager.total(value);
- }
-
- @Override
- public int dataSize()
- {
- /*
- * A counter column adds to a Column :
- * + 8 bytes for timestampOfLastDelete
- */
- return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
- }
-
- @Override
- public int serializedSize(CellNameType type, TypeSizes typeSizes)
- {
- return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
- }
-
- @Override
- public Column diff(Column column)
- {
- assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
-
- if (timestamp() < column.timestamp())
- return column;
-
- // Note that if at that point, column can't be a tombstone. Indeed,
- // column is the result of merging us with other nodes results, and
- // merging a CounterColumn with a tombstone never return a tombstone
- // unless that tombstone timestamp is greater that the CounterColumn
- // one.
- assert !(column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
-
- if (timestampOfLastDelete() < ((CounterColumn)column).timestampOfLastDelete())
- return column;
- ContextRelationship rel = contextManager.diff(column.value(), value());
- if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
- return column;
- return null;
- }
-
- /*
- * We have to special case digest creation for counter column because
- * we don't want to include the information about which shard of the
- * context is a delta or not, since this information differs from node to
- * node.
- */
- @Override
- public void updateDigest(MessageDigest digest)
- {
- digest.update(name.toByteBuffer().duplicate());
- // We don't take the deltas into account in a digest
- contextManager.updateDigest(digest, value);
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- buffer.writeLong(timestamp);
- buffer.writeByte(serializationFlags());
- buffer.writeLong(timestampOfLastDelete);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
-
- @Override
- public Column reconcile(Column column, Allocator allocator)
- {
- assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
-
- // live + tombstone: track last tombstone
- if (column.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired column, so the current time is irrelevant
- {
- // live < tombstone
- if (timestamp() < column.timestamp())
- {
- return column;
- }
- // live last delete >= tombstone
- if (timestampOfLastDelete() >= column.timestamp())
- {
- return this;
- }
- // live last delete < tombstone
- return new CounterColumn(name(), value(), timestamp(), column.timestamp());
- }
- // live < live last delete
- if (timestamp() < ((CounterColumn)column).timestampOfLastDelete())
- return column;
- // live last delete > live
- if (timestampOfLastDelete() > column.timestamp())
- return this;
- // live + live: merge clocks; update value
- return new CounterColumn(
- name(),
- contextManager.merge(value(), column.value(), allocator),
- Math.max(timestamp(), column.timestamp()),
- Math.max(timestampOfLastDelete(), ((CounterColumn)column).timestampOfLastDelete()));
- }
-
- @Override
- public boolean equals(Object o)
- {
- // super.equals() returns false if o is not a CounterColumn
- return super.equals(o) && timestampOfLastDelete == ((CounterColumn)o).timestampOfLastDelete;
- }
-
- @Override
- public int hashCode()
- {
- int result = super.hashCode();
- result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
- return result;
- }
-
- @Override
- public Column localCopy(ColumnFamilyStore cfs)
- {
- return localCopy(cfs, HeapAllocator.instance);
- }
-
- @Override
- public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
- {
- return new CounterColumn(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
- }
-
- @Override
- public String getString(CellNameType comparator)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(comparator.getString(name));
- sb.append(":");
- sb.append(false);
- sb.append(":");
- sb.append(contextManager.toString(value));
- sb.append("@");
- sb.append(timestamp());
- sb.append("!");
- sb.append(timestampOfLastDelete);
- return sb.toString();
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.COUNTER_MASK;
- }
-
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- validateName(metadata);
- // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
- // which is not the internal representation of counters
- contextManager.validateContext(value());
- }
-
- /**
- * Check if a given counterId is found in this CounterColumn context.
- */
- public boolean hasCounterId(CounterId id)
- {
- return contextManager.hasCounterId(value(), id);
- }
-
- private CounterColumn computeOldShardMerger(int mergeBefore)
- {
- ByteBuffer bb = contextManager.computeOldShardMerger(value(), CounterId.getOldLocalCounterIds(), mergeBefore);
- if (bb == null)
- return null;
- else
- return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
- }
-
- private CounterColumn removeOldShards(int gcBefore)
- {
- ByteBuffer bb = contextManager.removeOldShards(value(), gcBefore);
- if (bb == value())
- return this;
- else
- {
- return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
- }
- }
-
- public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
- {
- mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
- }
-
- /**
- * There is two phase to the removal of old shards.
- * First phase: we merge the old shard value to the current shard and
- * 'nulify' the old one. We then send the counter context with the old
- * shard nulified to all other replica.
- * Second phase: once an old shard has been nulified for longer than
- * gc_grace (to be sure all other replica had been aware of the merge), we
- * simply remove that old shard from the context (it's value is 0).
- * This method does both phases.
- * (Note that the sendToOtherReplica flag is here only to facilitate
- * testing. It should be true in real code so use the method above
- * preferably)
- */
- public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
- {
- ColumnFamily remoteMerger = null;
-
- for (Column c : cf)
- {
- if (!(c instanceof CounterColumn))
- continue;
- CounterColumn cc = (CounterColumn) c;
- CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
- CounterColumn merged = cc;
- if (shardMerger != null)
- {
- merged = (CounterColumn) cc.reconcile(shardMerger);
- if (remoteMerger == null)
- remoteMerger = cf.cloneMeShallow();
- remoteMerger.addColumn(merged);
- }
- CounterColumn cleaned = merged.removeOldShards(gcBefore);
- if (cleaned != cc)
- {
- cf.replace(cc, cleaned);
- }
- }
-
- if (remoteMerger != null && sendToOtherReplica)
- {
- try
- {
- sendToOtherReplica(key, remoteMerger);
- }
- catch (Exception e)
- {
- logger.error("Error while sending shard merger mutation to remote endpoints", e);
- }
- }
- }
-
- public Column markDeltaToBeCleared()
- {
- return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
- }
-
- private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
- {
- RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
-
- final InetAddress local = FBUtilities.getBroadcastAddress();
- String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
-
- StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
- {
- public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
- throws OverloadedException
- {
- // We should only send to the remote replica, not the local one
- Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
- // Fake local response to be a good lad but we won't wait on the responseHandler
- responseHandler.response(null);
- StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
- }
- }, null, WriteType.SIMPLE);
-
- // we don't wait for answers
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index eae8e12..f0942e2 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -129,7 +129,7 @@ public class CounterMutation implements IMutation
public void apply()
{
- // transform all CounterUpdateColumn to CounterColumn: accomplished by localCopy
+ // transform all CounterUpdateCell to CounterCell: accomplished by localCopy
RowMutation rm = new RowMutation(rowMutation.getKeyspaceName(), ByteBufferUtil.clone(rowMutation.key()));
Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
@@ -137,9 +137,9 @@ public class CounterMutation implements IMutation
{
ColumnFamily cf = cf_.cloneMeShallow();
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
- for (Column column : cf_)
+ for (Cell cell : cf_)
{
- cf.addColumn(column.localCopy(cfs), HeapAllocator.instance);
+ cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
}
rm.add(cf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
new file mode 100644
index 0000000..8acb854
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
+
+/**
+ * A counter update while it hasn't been applied yet by the leader replica.
+ *
+ * Contains a single counter update. When applied by the leader replica, this
+ * is transformed to a relevant CounterCell. This Cell is a temporary data
+ * structure that should never be stored inside a memtable or an sstable.
+ */
+public class CounterUpdateCell extends Cell
+{
+ public CounterUpdateCell(CellName name, long value, long timestamp)
+ {
+ this(name, ByteBufferUtil.bytes(value), timestamp);
+ }
+
+ public CounterUpdateCell(CellName name, ByteBuffer value, long timestamp)
+ {
+ super(name, value, timestamp);
+ }
+
+ public long delta()
+ {
+ return value().getLong(value().position());
+ }
+
+ @Override
+ public Cell diff(Cell cell)
+ {
+ // Diff is used during reads, but we should never read those columns
+ throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateCell.");
+ }
+
+ @Override
+ public Cell reconcile(Cell cell, Allocator allocator)
+ {
+ // The only time this could happen is if a batchAdd ships two
+ // increment for the same cell. Hence we simply sums the delta.
+
+ assert (cell instanceof CounterUpdateCell) || (cell instanceof DeletedCell) : "Wrong class type.";
+
+ // tombstones take precedence
+ if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
+ return timestamp() > cell.timestamp() ? this : cell;
+
+ // neither is tombstoned
+ CounterUpdateCell c = (CounterUpdateCell) cell;
+ return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.COUNTER_UPDATE_MASK;
+ }
+
+ @Override
+ public CounterCell localCopy(ColumnFamilyStore cfs)
+ {
+ return new CounterCell(name.copy(HeapAllocator.instance),
+ CounterContext.instance().create(delta(), HeapAllocator.instance),
+ timestamp(),
+ Long.MIN_VALUE);
+ }
+
+ @Override
+ public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ {
+ return new CounterCell(name.copy(allocator),
+ CounterContext.instance().create(delta(), allocator),
+ timestamp(),
+ Long.MIN_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
deleted file mode 100644
index aaf3307..0000000
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-
-/**
- * A counter update while it hasn't been applied yet by the leader replica.
- *
- * Contains a single counter update. When applied by the leader replica, this
- * is transformed to a relevant CounterColumn. This Column is a temporary data
- * structure that should never be stored inside a memtable or an sstable.
- */
-public class CounterUpdateColumn extends Column
-{
- public CounterUpdateColumn(CellName name, long value, long timestamp)
- {
- this(name, ByteBufferUtil.bytes(value), timestamp);
- }
-
- public CounterUpdateColumn(CellName name, ByteBuffer value, long timestamp)
- {
- super(name, value, timestamp);
- }
-
- public long delta()
- {
- return value().getLong(value().position());
- }
-
- @Override
- public Column diff(Column column)
- {
- // Diff is used during reads, but we should never read those columns
- throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateColumn.");
- }
-
- @Override
- public Column reconcile(Column column, Allocator allocator)
- {
- // The only time this could happen is if a batchAdd ships two
- // increment for the same column. Hence we simply sums the delta.
-
- assert (column instanceof CounterUpdateColumn) || (column instanceof DeletedColumn) : "Wrong class type.";
-
- // tombstones take precedence
- if (column.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired column, so the current time is irrelevant
- return timestamp() > column.timestamp() ? this : column;
-
- // neither is tombstoned
- CounterUpdateColumn c = (CounterUpdateColumn)column;
- return new CounterUpdateColumn(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.COUNTER_UPDATE_MASK;
- }
-
- @Override
- public CounterColumn localCopy(ColumnFamilyStore cfs)
- {
- return new CounterColumn(name.copy(HeapAllocator.instance),
- CounterContext.instance().create(delta(), HeapAllocator.instance),
- timestamp(),
- Long.MIN_VALUE);
- }
-
- @Override
- public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
- {
- return new CounterColumn(name.copy(allocator),
- CounterContext.instance().create(delta(), allocator),
- timestamp(),
- Long.MIN_VALUE);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index f30e256..693ef97 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -74,7 +74,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*
* Where <key> is a name of keyspace e.g. "ks".
*
- * Column names where made composite to support 3-level nesting which represents following structure:
+ * Cell names where made composite to support 3-level nesting which represents following structure:
* "ColumnFamily name":"column name":"column attribute" => "value"
*
* Example of schema (using CLI):
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
new file mode 100644
index 0000000..5b89e1d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
+
+public class DeletedCell extends Cell
+{
+ public DeletedCell(CellName name, int localDeletionTime, long timestamp)
+ {
+ this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
+ }
+
+ public DeletedCell(CellName name, ByteBuffer value, long timestamp)
+ {
+ super(name, value, timestamp);
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ return new DeletedCell(newName, value, timestamp);
+ }
+
+ @Override
+ public Cell withUpdatedTimestamp(long newTimestamp)
+ {
+ return new DeletedCell(name, value, newTimestamp);
+ }
+
+ @Override
+ public boolean isMarkedForDelete(long now)
+ {
+ return true;
+ }
+
+ @Override
+ public long getMarkedForDeleteAt()
+ {
+ return timestamp;
+ }
+
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ digest.update(name.toByteBuffer().duplicate());
+
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try
+ {
+ buffer.writeLong(timestamp);
+ buffer.writeByte(serializationFlags());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ digest.update(buffer.getData(), 0, buffer.getLength());
+ }
+
+ @Override
+ public int getLocalDeletionTime()
+ {
+ return value.getInt(value.position());
+ }
+
+ @Override
+ public Cell reconcile(Cell cell, Allocator allocator)
+ {
+ if (cell instanceof DeletedCell)
+ return super.reconcile(cell, allocator);
+ return cell.reconcile(this, allocator);
+ }
+
+ @Override
+ public Cell localCopy(ColumnFamilyStore cfs)
+ {
+ return new DeletedCell(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
+ }
+
+ @Override
+ public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ {
+ return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.DELETION_MASK;
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ validateName(metadata);
+ if (value().remaining() != 4)
+ throw new MarshalException("A tombstone value should be 4 bytes long");
+ if (getLocalDeletionTime() < 0)
+ throw new MarshalException("The local deletion time should not be negative");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
deleted file mode 100644
index ec88015..0000000
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-
-public class DeletedColumn extends Column
-{
- public DeletedColumn(CellName name, int localDeletionTime, long timestamp)
- {
- this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
- }
-
- public DeletedColumn(CellName name, ByteBuffer value, long timestamp)
- {
- super(name, value, timestamp);
- }
-
- @Override
- public Column withUpdatedName(CellName newName)
- {
- return new DeletedColumn(newName, value, timestamp);
- }
-
- @Override
- public Column withUpdatedTimestamp(long newTimestamp)
- {
- return new DeletedColumn(name, value, newTimestamp);
- }
-
- @Override
- public boolean isMarkedForDelete(long now)
- {
- return true;
- }
-
- @Override
- public long getMarkedForDeleteAt()
- {
- return timestamp;
- }
-
- @Override
- public void updateDigest(MessageDigest digest)
- {
- digest.update(name.toByteBuffer().duplicate());
-
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- buffer.writeLong(timestamp);
- buffer.writeByte(serializationFlags());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
-
- @Override
- public int getLocalDeletionTime()
- {
- return value.getInt(value.position());
- }
-
- @Override
- public Column reconcile(Column column, Allocator allocator)
- {
- if (column instanceof DeletedColumn)
- return super.reconcile(column, allocator);
- return column.reconcile(this, allocator);
- }
-
- @Override
- public Column localCopy(ColumnFamilyStore cfs)
- {
- return new DeletedColumn(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
- }
-
- @Override
- public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
- {
- return new DeletedColumn(name.copy(allocator), allocator.clone(value), timestamp);
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.DELETION_MASK;
- }
-
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- validateName(metadata);
- if (value().remaining() != 4)
- throw new MarshalException("A tombstone value should be 4 bytes long");
- if (getLocalDeletionTime() < 0)
- throw new MarshalException("The local deletion time should not be negative");
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 0bd0635..5c62132 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -106,14 +106,14 @@ public class DeletionInfo
}
/**
- * Return whether a given column is deleted by the container having this deletion info.
+ * Return whether a given cell is deleted by the container having this deletion info.
*
- * @param column the column to check.
- * @return true if the column is deleted, false otherwise
+ * @param cell the cell to check.
+ * @return true if the cell is deleted, false otherwise
*/
- public boolean isDeleted(Column column)
+ public boolean isDeleted(Cell cell)
{
- return isDeleted(column.name(), column.timestamp());
+ return isDeleted(cell.name(), cell.timestamp());
}
public boolean isDeleted(Composite name, long timestamp)
@@ -375,9 +375,9 @@ public class DeletionInfo
this.reversed = reversed;
}
- public boolean isDeleted(Column column)
+ public boolean isDeleted(Cell cell)
{
- return isDeleted(column.name(), column.timestamp());
+ return isDeleted(cell.name(), cell.timestamp());
}
public boolean isDeleted(Composite name, long timestamp)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/EmptyColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyColumns.java b/src/java/org/apache/cassandra/db/EmptyColumns.java
index 782ffc9..5498353 100644
--- a/src/java/org/apache/cassandra/db/EmptyColumns.java
+++ b/src/java/org/apache/cassandra/db/EmptyColumns.java
@@ -63,22 +63,22 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
return factory;
}
- public void addColumn(Column column, Allocator allocator)
+ public void addColumn(Cell cell, Allocator allocator)
{
throw new UnsupportedOperationException();
}
- public void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation)
+ public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation)
{
throw new UnsupportedOperationException();
}
- public boolean replace(Column oldColumn, Column newColumn)
+ public boolean replace(Cell oldCell, Cell newCell)
{
throw new UnsupportedOperationException();
}
- public Column getColumn(CellName name)
+ public Cell getColumn(CellName name)
{
throw new UnsupportedOperationException();
}
@@ -88,12 +88,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
return Collections.emptyList();
}
- public Collection<Column> getSortedColumns()
+ public Collection<Cell> getSortedColumns()
{
return Collections.emptyList();
}
- public Collection<Column> getReverseSortedColumns()
+ public Collection<Cell> getReverseSortedColumns()
{
return Collections.emptyList();
}
@@ -103,12 +103,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
return 0;
}
- public Iterator<Column> iterator(ColumnSlice[] slices)
+ public Iterator<Cell> iterator(ColumnSlice[] slices)
{
return Iterators.emptyIterator();
}
- public Iterator<Column> reverseIterator(ColumnSlice[] slices)
+ public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
{
return Iterators.emptyIterator();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
new file mode 100644
index 0000000..2b9541c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.HeapAllocator;
+
+/**
+ * Alternative to Cell that have an expiring time.
+ * ExpiringCell is immutable (as Cell is).
+ *
+ * Note that ExpiringCell does not override Cell.getMarkedForDeleteAt,
+ * which means that it's in the somewhat unintuitive position of being deleted (after its expiration)
+ * without having a time-at-which-it-became-deleted. (Because ttl is a server-side measurement,
+ * we can't mix it with the timestamp field, which is client-supplied and whose resolution we
+ * can't assume anything about.)
+ */
+public class ExpiringCell extends Cell
+{
+ public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
+
+ private final int localExpirationTime;
+ private final int timeToLive;
+
+ public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive)
+ {
+ this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
+ }
+
+ public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
+ {
+ super(name, value, timestamp);
+ assert timeToLive > 0 : timeToLive;
+ assert localExpirationTime > 0 : localExpirationTime;
+ this.timeToLive = timeToLive;
+ this.localExpirationTime = localExpirationTime;
+ }
+
+ /** @return Either a DeletedCell, or an ExpiringCell. */
+ public static Cell create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
+ {
+ if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
+ return new ExpiringCell(name, value, timestamp, timeToLive, localExpirationTime);
+ // The column is now expired, we can safely return a simple tombstone. Note that
+ // as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+ // we'll fulfil our responsibility to repair. See discussion at
+ // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+ return new DeletedCell(name, localExpirationTime - timeToLive, timestamp);
+ }
+
+ public int getTimeToLive()
+ {
+ return timeToLive;
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ return new ExpiringCell(newName, value, timestamp, timeToLive, localExpirationTime);
+ }
+
+ @Override
+ public Cell withUpdatedTimestamp(long newTimestamp)
+ {
+ return new ExpiringCell(name, value, newTimestamp, timeToLive, localExpirationTime);
+ }
+
+ @Override
+ public int dataSize()
+ {
+ return super.dataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);
+ }
+
+ @Override
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
+ {
+ /*
+ * An expired column adds to a Cell :
+ * 4 bytes for the localExpirationTime
+ * + 4 bytes for the timeToLive
+ */
+ return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
+ }
+
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ digest.update(name.toByteBuffer().duplicate());
+ digest.update(value.duplicate());
+
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try
+ {
+ buffer.writeLong(timestamp);
+ buffer.writeByte(serializationFlags());
+ buffer.writeInt(timeToLive);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ digest.update(buffer.getData(), 0, buffer.getLength());
+ }
+
+ @Override
+ public int getLocalDeletionTime()
+ {
+ return localExpirationTime;
+ }
+
+ @Override
+ public Cell localCopy(ColumnFamilyStore cfs)
+ {
+ return localCopy(cfs, HeapAllocator.instance);
+ }
+
+ @Override
+ public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ {
+ return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
+ }
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(super.getString(comparator));
+ sb.append("!");
+ sb.append(timeToLive);
+ return sb.toString();
+ }
+
+ @Override
+ public boolean isMarkedForDelete(long now)
+ {
+ return (int) (now / 1000) >= getLocalDeletionTime();
+ }
+
+ @Override
+ public long getMarkedForDeleteAt()
+ {
+ return timestamp;
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.EXPIRATION_MASK;
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ super.validateFields(metadata);
+ if (timeToLive <= 0)
+ throw new MarshalException("A column TTL should be > 0");
+ if (localExpirationTime < 0)
+ throw new MarshalException("The local expiration time should not be negative");
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ // super.equals() returns false if o is not a CounterCell
+ return super.equals(o)
+ && localExpirationTime == ((ExpiringCell)o).localExpirationTime
+ && timeToLive == ((ExpiringCell)o).timeToLive;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = super.hashCode();
+ result = 31 * result + localExpirationTime;
+ result = 31 * result + timeToLive;
+ return result;
+ }
+}