You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/29 14:05:08 UTC
[6/7] Push more of memtable data off-heap
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferDecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferDecoratedKey.java b/src/java/org/apache/cassandra/db/BufferDecoratedKey.java
new file mode 100644
index 0000000..d375162
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferDecoratedKey.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.dht.Token;
+
+public class BufferDecoratedKey extends DecoratedKey
+{
+ private final ByteBuffer key;
+
+ public BufferDecoratedKey(Token token, ByteBuffer key)
+ {
+ super(token);
+ assert key != null;
+ this.key = key;
+ }
+
+ public ByteBuffer getKey()
+ {
+ return key;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferDeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferDeletedCell.java b/src/java/org/apache/cassandra/db/BufferDeletedCell.java
new file mode 100644
index 0000000..a6518de
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferDeletedCell.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferDeletedCell extends BufferCell implements DeletedCell
+{
+ public BufferDeletedCell(CellName name, int localDeletionTime, long timestamp)
+ {
+ this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
+ }
+
+ public BufferDeletedCell(CellName name, ByteBuffer value, long timestamp)
+ {
+ super(name, value, timestamp);
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ return new BufferDeletedCell(newName, value, timestamp);
+ }
+
+ @Override
+ public Cell withUpdatedTimestamp(long newTimestamp)
+ {
+ return new BufferDeletedCell(name, value, newTimestamp);
+ }
+
+ @Override
+ public boolean isMarkedForDelete(long now)
+ {
+ return true;
+ }
+
+ @Override
+ public long getMarkedForDeleteAt()
+ {
+ return timestamp;
+ }
+
+ @Override
+ public int getLocalDeletionTime()
+ {
+ return value().getInt(value.position());
+ }
+
+ @Override
+ public Cell reconcile(Cell cell)
+ {
+ if (cell instanceof DeletedCell)
+ return super.reconcile(cell);
+ return cell.reconcile(this);
+ }
+
+ @Override
+ public DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+ {
+ return new BufferDeletedCell(name.copy(metadata, allocator), allocator.clone(value), timestamp);
+ }
+
+ @Override
+ public DeletedCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+ {
+ return allocator.clone(this, metadata, opGroup);
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.DELETION_MASK;
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ validateName(metadata);
+ if (value().remaining() != 4)
+ throw new MarshalException("A tombstone value should be 4 bytes long");
+ if (getLocalDeletionTime() < 0)
+ throw new MarshalException("The local deletion time should not be negative");
+ }
+
+ public boolean equals(Cell cell)
+ {
+ return timestamp() == cell.timestamp() && getLocalDeletionTime() == cell.getLocalDeletionTime() && name().equals(cell.name());
+ }
+
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ digest.update(name().toByteBuffer().duplicate());
+
+ FBUtilities.updateWithLong(digest, timestamp());
+ FBUtilities.updateWithByte(digest, serializationFlags());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferExpiringCell.java b/src/java/org/apache/cassandra/db/BufferExpiringCell.java
new file mode 100644
index 0000000..95ed45a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferExpiringCell.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferExpiringCell extends BufferCell implements ExpiringCell
+{
+ private final int localExpirationTime;
+ private final int timeToLive;
+
+ public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive)
+ {
+ this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
+ }
+
+ public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
+ {
+ super(name, value, timestamp);
+ assert timeToLive > 0 : timeToLive;
+ assert localExpirationTime > 0 : localExpirationTime;
+ this.timeToLive = timeToLive;
+ this.localExpirationTime = localExpirationTime;
+ }
+
+ public int getTimeToLive()
+ {
+ return timeToLive;
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ return new BufferExpiringCell(newName, value(), timestamp(), timeToLive, localExpirationTime);
+ }
+
+ @Override
+ public Cell withUpdatedTimestamp(long newTimestamp)
+ {
+ return new BufferExpiringCell(name(), value(), newTimestamp, timeToLive, localExpirationTime);
+ }
+
+ @Override
+ public int cellDataSize()
+ {
+ return super.cellDataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);
+ }
+
+ @Override
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
+ {
+ /*
+ * An expired column adds to a Cell :
+ * 4 bytes for the localExpirationTime
+ * + 4 bytes for the timeToLive
+ */
+ return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
+ }
+
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ super.updateDigest(digest);
+ FBUtilities.updateWithInt(digest, timeToLive);
+ }
+
+ @Override
+ public int getLocalDeletionTime()
+ {
+ return localExpirationTime;
+ }
+
+ @Override
+ public ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+ {
+ return new BufferExpiringCell(name.copy(metadata, allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
+ }
+
+ @Override
+ public ExpiringCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+ {
+ return allocator.clone(this, metadata, opGroup);
+ }
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ return String.format("%s!%d", super.getString(comparator), timeToLive);
+ }
+
+ @Override
+ public boolean isMarkedForDelete(long now)
+ {
+ return (int) (now / 1000) >= getLocalDeletionTime();
+ }
+
+ @Override
+ public long getMarkedForDeleteAt()
+ {
+ return timestamp;
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.EXPIRATION_MASK;
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ super.validateFields(metadata);
+
+ if (timeToLive <= 0)
+ throw new MarshalException("A column TTL should be > 0");
+ if (localExpirationTime < 0)
+ throw new MarshalException("The local expiration time should not be negative");
+ }
+
+ @Override
+ public boolean equals(Cell cell)
+ {
+ return cell instanceof ExpiringCell && equals((ExpiringCell) cell);
+ }
+
+ public boolean equals(ExpiringCell cell)
+ {
+ // super.equals() returns false if o is not a CounterCell
+ return super.equals(cell)
+ && getLocalDeletionTime() == cell.getLocalDeletionTime()
+ && getTimeToLive() == cell.getTimeToLive();
+ }
+
+ /** @return Either a DeletedCell, or an ExpiringCell. */
+ public static Cell create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
+ {
+ if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
+ return new BufferExpiringCell(name, value, timestamp, timeToLive, localExpirationTime);
+ // The column is now expired, we can safely return a simple tombstone. Note that
+ // as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+ // we'll fulfil our responsibility to repair. See discussion at
+ // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+ return new BufferDeletedCell(name, localExpirationTime - timeToLive, timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java
index f853d17..dfe49ee 100644
--- a/src/java/org/apache/cassandra/db/CFRowAdder.java
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -52,7 +52,7 @@ public class CFRowAdder
// If a CQL3 table, add the row marker
if (cf.metadata().isCQL3Table() && !prefix.isStatic())
- cf.addColumn(new Cell(cf.getComparator().rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+ cf.addColumn(new BufferCell(cf.getComparator().rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
}
public CFRowAdder add(String cql3ColumnName, Object value)
@@ -96,14 +96,14 @@ public class CFRowAdder
{
if (value == null)
{
- cf.addColumn(new DeletedCell(name, ldt, timestamp));
+ cf.addColumn(new BufferDeletedCell(name, ldt, timestamp));
}
else
{
AbstractType valueType = def.type.isCollection()
? ((CollectionType) def.type).valueComparator()
: def.type;
- cf.addColumn(new Cell(name, value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value), timestamp));
+ cf.addColumn(new BufferCell(name, value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value), timestamp));
}
return this;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java
index 8db9770..c19b5dd 100644
--- a/src/java/org/apache/cassandra/db/Cell.java
+++ b/src/java/org/apache/cassandra/db/Cell.java
@@ -17,261 +17,58 @@
*/
package org.apache.cassandra.db;
-import java.io.DataInput;
-import java.io.IOError;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Iterator;
-
-import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
/**
* Cell is immutable, which prevents all kinds of confusion in a multithreaded environment.
*/
-public class Cell implements OnDiskAtom
+public interface Cell extends OnDiskAtom
{
public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
- private static final long EMPTY_SIZE = ObjectSizes.measure(new Cell(CellNames.simpleDense(ByteBuffer.allocate(1))));
-
- public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
- final ColumnSerializer.Flag flag,
- final int expireBefore,
- final Descriptor.Version version,
- final CellNameType type)
- {
- return new AbstractIterator<OnDiskAtom>()
- {
- protected OnDiskAtom computeNext()
- {
- OnDiskAtom atom;
- try
- {
- atom = type.onDiskAtomSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- if (atom == null)
- return endOfData();
-
- return atom;
- }
- };
- }
-
- protected final CellName name;
- protected final ByteBuffer value;
- protected final long timestamp;
-
- Cell(CellName name)
- {
- this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER);
- }
-
- public Cell(CellName name, ByteBuffer value)
- {
- this(name, value, 0);
- }
+ public Cell withUpdatedName(CellName newName);
- public Cell(CellName name, ByteBuffer value, long timestamp)
- {
- assert name != null;
- assert value != null;
- this.name = name;
- this.value = value;
- this.timestamp = timestamp;
- }
+ public Cell withUpdatedTimestamp(long newTimestamp);
- public Cell withUpdatedName(CellName newName)
- {
- return new Cell(newName, value, timestamp);
- }
-
- public Cell withUpdatedTimestamp(long newTimestamp)
- {
- return new Cell(name, value, newTimestamp);
- }
-
- public CellName name()
- {
- return name;
- }
-
- public ByteBuffer value()
- {
- return value;
- }
+ @Override
+ public CellName name();
- public long timestamp()
- {
- return timestamp;
- }
+ public ByteBuffer value();
- public boolean isMarkedForDelete(long now)
- {
- return false;
- }
+ public boolean isMarkedForDelete(long now);
- public boolean isLive(long now)
- {
- return !isMarkedForDelete(now);
- }
+ public boolean isLive(long now);
// Don't call unless the column is actually marked for delete.
- public long getMarkedForDeleteAt()
- {
- return Long.MAX_VALUE;
- }
+ public long getMarkedForDeleteAt();
- public int dataSize()
- {
- return name.dataSize() + value.remaining() + TypeSizes.NATIVE.sizeof(timestamp);
- }
+ public int cellDataSize();
// returns the size of the Cell and all references on the heap, excluding any costs associated with byte arrays
// that would be allocated by a localCopy, as these will be accounted for by the allocator
- public long excessHeapSizeExcludingData()
- {
- return EMPTY_SIZE + name.excessHeapSizeExcludingData() + ObjectSizes.sizeOnHeapExcludingData(value);
- }
-
- public int serializedSize(CellNameType type, TypeSizes typeSizes)
- {
- /*
- * Size of a column is =
- * size of a name (short + length of the string)
- * + 1 byte to indicate if the column has been deleted
- * + 8 bytes for timestamp
- * + 4 bytes which basically indicates the size of the byte array
- * + entire byte array.
- */
- int valueSize = value.remaining();
- return ((int)type.cellSerializer().serializedSize(name, typeSizes)) + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
- }
-
- public int serializationFlags()
- {
- return 0;
- }
-
- public Cell diff(Cell cell)
- {
- if (timestamp() < cell.timestamp())
- return cell;
- return null;
- }
-
- public void updateDigest(MessageDigest digest)
- {
- digest.update(name.toByteBuffer().duplicate());
- digest.update(value.duplicate());
-
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- buffer.writeLong(timestamp);
- buffer.writeByte(serializationFlags());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
-
- public int getLocalDeletionTime()
- {
- return Integer.MAX_VALUE;
- }
-
- public Cell reconcile(Cell cell)
- {
- // tombstones take precedence. (if both are tombstones, then it doesn't matter which one we use.)
- if (isMarkedForDelete(System.currentTimeMillis()))
- return timestamp() < cell.timestamp() ? cell : this;
- if (cell.isMarkedForDelete(System.currentTimeMillis()))
- return timestamp() > cell.timestamp() ? this : cell;
- // break ties by comparing values.
- if (timestamp() == cell.timestamp())
- return value().compareTo(cell.value()) < 0 ? cell : this;
- // neither is tombstoned and timestamps are different
- return timestamp() < cell.timestamp() ? cell : this;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- Cell cell = (Cell)o;
-
- return timestamp == cell.timestamp && name.equals(cell.name) && value.equals(cell.value);
- }
-
- @Override
- public int hashCode()
- {
- int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (value != null ? value.hashCode() : 0);
- result = 31 * result + (int)(timestamp ^ (timestamp >>> 32));
- return result;
- }
+ public long excessHeapSizeExcludingData();
- public Cell localCopy(AbstractAllocator allocator)
- {
- return new Cell(name.copy(allocator), allocator.clone(value), timestamp);
- }
+ public int serializedSize(CellNameType type, TypeSizes typeSizes);
+ public int serializationFlags();
- public String getString(CellNameType comparator)
- {
- return String.format("%s:%b:%d@%d",
- comparator.getString(name),
- isMarkedForDelete(System.currentTimeMillis()),
- value.remaining(),
- timestamp);
- }
+ public Cell diff(Cell cell);
- protected void validateName(CFMetaData metadata) throws MarshalException
- {
- metadata.comparator.validate(name());
- }
+ public Cell reconcile(Cell cell);
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- validateName(metadata);
+ public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator);
- AbstractType<?> valueValidator = metadata.getValueValidator(name());
- if (valueValidator != null)
- valueValidator.validate(value());
- }
+ public Cell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
- public static Cell create(CellName name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
- {
- if (ttl <= 0)
- ttl = metadata.getDefaultTimeToLive();
+ public String getString(CellNameType comparator);
- return ttl > 0
- ? new ExpiringCell(name, value, timestamp, ttl)
- : new Cell(name, value, timestamp);
- }
+ void validateName(CFMetaData metadata) throws MarshalException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 36a9ebf..f88c1a7 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -86,7 +86,7 @@ public class CollationController
{
OnDiskAtom atom = iter.next();
if (copyOnHeap)
- atom = ((Cell) atom).localCopy(HeapAllocator.instance);
+ atom = ((Cell) atom).localCopy(cfs.metadata, HeapAllocator.instance);
container.addAtom(atom);
}
}
@@ -147,7 +147,7 @@ public class CollationController
&& cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy)
{
Tracing.trace("Defragmenting requested data");
- Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.key, returnCF.cloneMe());
+ Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.getKey(), returnCF.cloneMe());
// skipping commitlog and index updates is fine since we're just de-fragmenting existing data
Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
}
@@ -204,7 +204,7 @@ public class CollationController
ColumnFamily newCf = cf.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
for (Cell cell : cf)
{
- newCf.addColumn(cell.localCopy(HeapAllocator.instance));
+ newCf.addColumn(cell.localCopy(cfs.metadata, HeapAllocator.instance));
}
cf = newCf;
iter = filter.getColumnFamilyIterator(cf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 4f85610..a261d73 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -128,23 +128,23 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
{
assert !metadata().isCounter();
- Cell cell = Cell.create(name, value, timestamp, timeToLive, metadata());
+ Cell cell = AbstractCell.create(name, value, timestamp, timeToLive, metadata());
addColumn(cell);
}
public void addCounter(CellName name, long value)
{
- addColumn(new CounterUpdateCell(name, value, System.currentTimeMillis()));
+ addColumn(new BufferCounterUpdateCell(name, value, System.currentTimeMillis()));
}
public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
{
- addColumn(new DeletedCell(name, localDeletionTime, timestamp));
+ addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
}
public void addTombstone(CellName name, int localDeletionTime, long timestamp)
{
- addColumn(new DeletedCell(name, localDeletionTime, timestamp));
+ addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
}
public void addAtom(OnDiskAtom atom)
@@ -325,7 +325,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
{
long size = 0;
for (Cell cell : this)
- size += cell.dataSize();
+ size += cell.cellDataSize();
return size;
}
@@ -424,8 +424,8 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
int deletionTime = cell.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
tombstones.update(deletionTime);
- minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name, metadata.comparator);
- maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name, metadata.comparator);
+ minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name(), metadata.comparator);
+ maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name(), metadata.comparator);
if (cell instanceof CounterCell)
hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards();
}
@@ -474,7 +474,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
{
ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
for (Cell cell : this)
- builder.put(cell.name, cell.value);
+ builder.put(cell.name(), cell.value());
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6559b40..3b1c67e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -75,6 +75,7 @@ import org.apache.cassandra.streaming.StreamLockfile;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -843,12 +844,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (SecondaryIndex index : indexManager.getIndexes())
{
- if (index.getAllocator() != null)
+ if (index.getIndexCfs() != null)
{
- onHeapRatio += index.getAllocator().onHeap().ownershipRatio();
- offHeapRatio += index.getAllocator().offHeap().ownershipRatio();
- onHeapTotal += index.getAllocator().onHeap().owns();
- offHeapTotal += index.getAllocator().offHeap().owns();
+ MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+ onHeapRatio += allocator.onHeap().ownershipRatio();
+ offHeapRatio += allocator.offHeap().ownershipRatio();
+ onHeapTotal += allocator.onHeap().owns();
+ offHeapTotal += allocator.offHeap().owns();
}
}
@@ -1095,10 +1097,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (SecondaryIndex index : cfs.indexManager.getIndexes())
{
- if (index.getAllocator() != null)
+ if (index.getIndexCfs() != null)
{
- onHeap += index.getAllocator().onHeap().ownershipRatio();
- offHeap += index.getAllocator().offHeap().ownershipRatio();
+ MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+ onHeap += allocator.onHeap().ownershipRatio();
+ offHeap += allocator.offHeap().ownershipRatio();
}
}
@@ -1213,7 +1216,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// 2. if it has been re-added since then, this particular column was inserted before the last drop
private static boolean isDroppedColumn(Cell c, CFMetaData meta)
{
- Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName());
+ Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName(meta));
return droppedAt != null && c.timestamp() <= droppedAt;
}
@@ -1869,7 +1872,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
ColumnFamily columns;
try (OpOrder.Group op = readOrdering.start())
{
- columns = controller.getTopLevelColumns(Memtable.memoryPool.needToCopyOnHeap());
+ columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap());
}
metric.updateSSTableIterated(controller.getSstablesIterated());
return columns;
@@ -1882,7 +1885,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
{
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
- if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+ if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
invalidateCachedRow(dk);
}
@@ -1891,7 +1894,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
{
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
- if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+ if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
CacheService.instance.counterCache.remove(key);
}
}
@@ -1939,7 +1942,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return computeNext();
if (logger.isTraceEnabled())
- logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
+ logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey()));
return current;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 8c22d71..8e7026c 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -116,7 +116,7 @@ public class ColumnSerializer implements ISerializer<Cell>
long timestampOfLastDelete = in.readLong();
long ts = in.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(in);
- return CounterCell.create(name, value, ts, timestampOfLastDelete, flag);
+ return BufferCounterCell.create(name, value, ts, timestampOfLastDelete, flag);
}
else if ((mask & EXPIRATION_MASK) != 0)
{
@@ -124,17 +124,17 @@ public class ColumnSerializer implements ISerializer<Cell>
int expiration = in.readInt();
long ts = in.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(in);
- return ExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
+ return BufferExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
}
else
{
long ts = in.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(in);
return (mask & COUNTER_UPDATE_MASK) != 0
- ? new CounterUpdateCell(name, value, ts)
+ ? new BufferCounterUpdateCell(name, value, ts)
: ((mask & DELETION_MASK) == 0
- ? new Cell(name, value, ts)
- : new DeletedCell(name, value, ts));
+ ? new BufferCell(name, value, ts)
+ : new BufferDeletedCell(name, value, ts));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index fc4ac3f..cda1200 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -17,223 +17,28 @@
*/
package org.apache.cassandra.db;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
/**
* A column that represents a partitioned counter.
*/
-public class CounterCell extends Cell
+public interface CounterCell extends Cell
{
- protected static final CounterContext contextManager = CounterContext.instance();
-
- private final long timestampOfLastDelete;
-
- public CounterCell(CellName name, ByteBuffer value, long timestamp)
- {
- this(name, value, timestamp, Long.MIN_VALUE);
- }
-
- public CounterCell(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
- {
- super(name, value, timestamp);
- this.timestampOfLastDelete = timestampOfLastDelete;
- }
-
- public static CounterCell create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
- {
- if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value)))
- value = contextManager.clearAllLocal(value);
- return new CounterCell(name, value, timestamp, timestampOfLastDelete);
- }
-
- // For use by tests of compatibility with pre-2.1 counter only.
- public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete)
- {
- return new CounterCell(name, contextManager.createLocal(value), timestamp, timestampOfLastDelete);
- }
-
- @Override
- public Cell withUpdatedName(CellName newName)
- {
- return new CounterCell(newName, value, timestamp, timestampOfLastDelete);
- }
-
- public long timestampOfLastDelete()
- {
- return timestampOfLastDelete;
- }
-
- public long total()
- {
- return contextManager.total(value);
- }
-
- @Override
- public int dataSize()
- {
- // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
- return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
- }
-
- @Override
- public int serializedSize(CellNameType type, TypeSizes typeSizes)
- {
- return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
- }
-
- @Override
- public Cell diff(Cell cell)
- {
- assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
-
- if (timestamp() < cell.timestamp())
- return cell;
-
- // Note that if at that point, cell can't be a tombstone. Indeed,
- // cell is the result of merging us with other nodes results, and
- // merging a CounterCell with a tombstone never return a tombstone
- // unless that tombstone timestamp is greater that the CounterCell
- // one.
- assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
-
- if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete())
- return cell;
- CounterContext.Relationship rel = contextManager.diff(cell.value(), value());
- if (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT)
- return cell;
- return null;
- }
-
- /*
- * We have to special case digest creation for counter column because
- * we don't want to include the information about which shard of the
- * context is a delta or not, since this information differs from node to
- * node.
- */
- @Override
- public void updateDigest(MessageDigest digest)
- {
- digest.update(name.toByteBuffer().duplicate());
- // We don't take the deltas into account in a digest
- contextManager.updateDigest(digest, value);
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- buffer.writeLong(timestamp);
- buffer.writeByte(serializationFlags());
- buffer.writeLong(timestampOfLastDelete);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
-
- @Override
- public Cell reconcile(Cell cell)
- {
- // live + tombstone: track last tombstone
- if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
- {
- // live < tombstone
- if (timestamp() < cell.timestamp())
- {
- return cell;
- }
- // live last delete >= tombstone
- if (timestampOfLastDelete() >= cell.timestamp())
- {
- return this;
- }
- // live last delete < tombstone
- return new CounterCell(name(), value(), timestamp(), cell.timestamp());
- }
-
- assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
-
- // live < live last delete
- if (timestamp() < ((CounterCell) cell).timestampOfLastDelete())
- return cell;
- // live last delete > live
- if (timestampOfLastDelete() > cell.timestamp())
- return this;
-
- // live + live. return one of the cells if its context is a superset of the other's, or merge them otherwise
- ByteBuffer context = contextManager.merge(value(), cell.value());
- if (context == value() && timestamp() >= cell.timestamp() && timestampOfLastDelete() >= ((CounterCell) cell).timestampOfLastDelete())
- return this;
- else if (context == cell.value() && cell.timestamp() >= timestamp() && ((CounterCell) cell).timestampOfLastDelete() >= timestampOfLastDelete())
- return cell;
- else // merge clocks and timsestamps.
- return new CounterCell(name(),
- context,
- Math.max(timestamp(), cell.timestamp()),
- Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
- }
-
- public boolean hasLegacyShards()
- {
- return contextManager.hasLegacyShards(value);
- }
-
- @Override
- public boolean equals(Object o)
- {
- // super.equals() returns false if o is not a CounterCell
- return super.equals(o) && timestampOfLastDelete == ((CounterCell)o).timestampOfLastDelete;
- }
+ static final CounterContext contextManager = CounterContext.instance();
- @Override
- public int hashCode()
- {
- return 31 * super.hashCode() + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
- }
+ public long timestampOfLastDelete();
- @Override
- public Cell localCopy(AbstractAllocator allocator)
- {
- return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
- }
+ public long total();
- @Override
- public String getString(CellNameType comparator)
- {
- return String.format("%s:false:%s@%d!%d",
- comparator.getString(name),
- contextManager.toString(value),
- timestamp,
- timestampOfLastDelete);
- }
+ public boolean hasLegacyShards();
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.COUNTER_MASK;
- }
+ public Cell markLocalToBeCleared();
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- validateName(metadata);
- // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
- // which is not the internal representation of counters
- contextManager.validateContext(value());
- }
+ CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
- public Cell markLocalToBeCleared()
- {
- ByteBuffer marked = contextManager.markLocalToBeCleared(value);
- return marked == value ? this : new CounterCell(name, marked, timestamp, timestampOfLastDelete);
- }
+ CounterCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index c19b436..32571cc 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -200,9 +200,9 @@ public class CounterMutation implements IMutation
long clock = currentValue.clock + 1L;
long count = currentValue.count + update.delta();
- resultCF.addColumn(new CounterCell(update.name(),
- CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
- update.timestamp()));
+ resultCF.addColumn(new BufferCounterCell(update.name(),
+ CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
+ update.timestamp()));
}
return resultCF;
@@ -253,7 +253,7 @@ public class CounterMutation implements IMutation
SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator);
for (int i = 0; i < currentValues.length; i++)
if (currentValues[i] == null)
- names.add(counterUpdateCells.get(i).name);
+ names.add(counterUpdateCells.get(i).name());
ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names));
Row row = cmd.getRow(cfs.keyspace);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
index 27d5270..58ac365 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -17,13 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.db.composites.CellNameType;
-
/**
* A counter update while it hasn't been applied yet by the leader replica.
*
@@ -31,61 +24,7 @@ import org.apache.cassandra.db.composites.CellNameType;
* is transformed to a relevant CounterCell. This Cell is a temporary data
* structure that should never be stored inside a memtable or an sstable.
*/
-public class CounterUpdateCell extends Cell
+public interface CounterUpdateCell extends Cell
{
- public CounterUpdateCell(CellName name, long value, long timestamp)
- {
- this(name, ByteBufferUtil.bytes(value), timestamp);
- }
-
- public CounterUpdateCell(CellName name, ByteBuffer value, long timestamp)
- {
- super(name, value, timestamp);
- }
-
- public long delta()
- {
- return value().getLong(value().position());
- }
-
- @Override
- public Cell diff(Cell cell)
- {
- // Diff is used during reads, but we should never read those columns
- throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateCell.");
- }
-
- @Override
- public Cell reconcile(Cell cell)
- {
- // The only time this could happen is if a batchAdd ships two
- // increment for the same cell. Hence we simply sums the delta.
-
- // tombstones take precedence
- if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
- return timestamp() > cell.timestamp() ? this : cell;
-
- // neither is tombstoned
- assert cell instanceof CounterUpdateCell : "Wrong class type.";
- CounterUpdateCell c = (CounterUpdateCell) cell;
- return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.COUNTER_UPDATE_MASK;
- }
-
- @Override
- public Cell localCopy(AbstractAllocator allocator)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getString(CellNameType comparator)
- {
- return String.format("%s:%s@%d", comparator.getString(name), ByteBufferUtil.toLong(value), timestamp);
- }
+ public long delta();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 91ff512..31a9370 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -163,7 +163,7 @@ public class DataRange
private boolean equals(RowPosition pos, ByteBuffer rowKey)
{
- return pos instanceof DecoratedKey && ((DecoratedKey)pos).key.equals(rowKey);
+ return pos instanceof DecoratedKey && ((DecoratedKey)pos).getKey().equals(rowKey);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java
index 8f7a22b..fb3d7ab 100644
--- a/src/java/org/apache/cassandra/db/DecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/DecoratedKey.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -33,7 +34,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
* if this matters, you can subclass RP to use a stronger hash, or use a non-lossy tokenization scheme (as in the
* OrderPreservingPartitioner classes).
*/
-public class DecoratedKey extends RowPosition
+public abstract class DecoratedKey implements RowPosition
{
public static final Comparator<DecoratedKey> comparator = new Comparator<DecoratedKey>()
{
@@ -43,20 +44,18 @@ public class DecoratedKey extends RowPosition
}
};
- public final Token token;
- public final ByteBuffer key;
+ private final Token token;
- public DecoratedKey(Token token, ByteBuffer key)
+ public DecoratedKey(Token token)
{
- assert token != null && key != null;
+ assert token != null;
this.token = token;
- this.key = key;
}
@Override
public int hashCode()
{
- return key.hashCode(); // hash of key is enough
+ return getKey().hashCode(); // hash of key is enough
}
@Override
@@ -64,12 +63,11 @@ public class DecoratedKey extends RowPosition
{
if (this == obj)
return true;
- if (obj == null || this.getClass() != obj.getClass())
+ if (obj == null || !(obj instanceof DecoratedKey))
return false;
DecoratedKey other = (DecoratedKey)obj;
-
- return ByteBufferUtil.compareUnsigned(key, other.key) == 0; // we compare faster than BB.equals for array backed BB
+ return ByteBufferUtil.compareUnsigned(getKey(), other.getKey()) == 0; // we compare faster than BB.equals for array backed BB
}
public int compareTo(RowPosition pos)
@@ -82,8 +80,8 @@ public class DecoratedKey extends RowPosition
return -pos.compareTo(this);
DecoratedKey otherKey = (DecoratedKey) pos;
- int cmp = token.compareTo(otherKey.getToken());
- return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.key) : cmp;
+ int cmp = getToken().compareTo(otherKey.getToken());
+ return cmp == 0 ? ByteBufferUtil.compareUnsigned(getKey(), otherKey.getKey()) : cmp;
}
public static int compareTo(IPartitioner partitioner, ByteBuffer key, RowPosition position)
@@ -94,7 +92,7 @@ public class DecoratedKey extends RowPosition
DecoratedKey otherKey = (DecoratedKey) position;
int cmp = partitioner.getToken(key).compareTo(otherKey.getToken());
- return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.key) : cmp;
+ return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.getKey()) : cmp;
}
public boolean isMinimum(IPartitioner partitioner)
@@ -103,6 +101,11 @@ public class DecoratedKey extends RowPosition
return false;
}
+ public boolean isMinimum()
+ {
+ return isMinimum(StorageService.getPartitioner());
+ }
+
public RowPosition.Kind kind()
{
return RowPosition.Kind.ROW_KEY;
@@ -111,12 +114,14 @@ public class DecoratedKey extends RowPosition
@Override
public String toString()
{
- String keystring = key == null ? "null" : ByteBufferUtil.bytesToHex(key);
- return "DecoratedKey(" + token + ", " + keystring + ")";
+ String keystring = getKey() == null ? "null" : ByteBufferUtil.bytesToHex(getKey());
+ return "DecoratedKey(" + getToken() + ", " + keystring + ")";
}
public Token getToken()
{
return token;
}
+
+ public abstract ByteBuffer getKey();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 351ee4b..6f9a270 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -248,7 +248,7 @@ public class DefsTables
if (newState.hasColumns())
updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList(), new UTMetaData()));
else
- keyspacesToDrop.add(AsciiType.instance.getString(key.key));
+ keyspacesToDrop.add(AsciiType.instance.getString(key.getKey()));
}
return keyspacesToDrop;
@@ -297,7 +297,7 @@ public class DefsTables
}
else // has modifications in the nested ColumnFamilies, need to perform nested diff to determine what was really changed
{
- String ksName = AsciiType.instance.getString(keyspace.key);
+ String ksName = AsciiType.instance.getString(keyspace.getKey());
Map<String, CFMetaData> oldCfDefs = new HashMap<String, CFMetaData>();
for (CFMetaData cfm : Schema.instance.getKSMetaData(ksName).cfMetaData().values())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
index 00788f8..998c409 100644
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -17,104 +17,14 @@
*/
package org.apache.cassandra.db;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
-public class DeletedCell extends Cell
+public interface DeletedCell extends Cell
{
- public DeletedCell(CellName name, int localDeletionTime, long timestamp)
- {
- this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
- }
-
- public DeletedCell(CellName name, ByteBuffer value, long timestamp)
- {
- super(name, value, timestamp);
- }
-
- @Override
- public Cell withUpdatedName(CellName newName)
- {
- return new DeletedCell(newName, value, timestamp);
- }
-
- @Override
- public Cell withUpdatedTimestamp(long newTimestamp)
- {
- return new DeletedCell(name, value, newTimestamp);
- }
-
- @Override
- public boolean isMarkedForDelete(long now)
- {
- return true;
- }
-
- @Override
- public long getMarkedForDeleteAt()
- {
- return timestamp;
- }
-
- @Override
- public void updateDigest(MessageDigest digest)
- {
- digest.update(name.toByteBuffer().duplicate());
-
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- buffer.writeLong(timestamp);
- buffer.writeByte(serializationFlags());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
-
- @Override
- public int getLocalDeletionTime()
- {
- return value.getInt(value.position());
- }
-
- @Override
- public Cell reconcile(Cell cell)
- {
- if (cell instanceof DeletedCell)
- return super.reconcile(cell);
- return cell.reconcile(this);
- }
-
- @Override
- public Cell localCopy(AbstractAllocator allocator)
- {
- return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.DELETION_MASK;
- }
+ DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- validateName(metadata);
- if (value().remaining() != 4)
- throw new MarshalException("A tombstone value should be 4 bytes long");
- if (getLocalDeletionTime() < 0)
- throw new MarshalException("The local deletion time should not be negative");
- }
+ DeletedCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
index 57fac5b..5fc0f94 100644
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -17,16 +17,10 @@
*/
package org.apache.cassandra.db;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
/**
* Alternative to Cell that have an expiring time.
@@ -38,154 +32,13 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
* we can't mix it with the timestamp field, which is client-supplied and whose resolution we
* can't assume anything about.)
*/
-public class ExpiringCell extends Cell
+public interface ExpiringCell extends Cell
{
public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
- private final int localExpirationTime;
- private final int timeToLive;
-
- public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive)
- {
- this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
- }
-
- public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
- {
- super(name, value, timestamp);
- assert timeToLive > 0 : timeToLive;
- assert localExpirationTime > 0 : localExpirationTime;
- this.timeToLive = timeToLive;
- this.localExpirationTime = localExpirationTime;
- }
-
- /** @return Either a DeletedCell, or an ExpiringCell. */
- public static Cell create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
- {
- if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
- return new ExpiringCell(name, value, timestamp, timeToLive, localExpirationTime);
- // The column is now expired, we can safely return a simple tombstone. Note that
- // as long as the expiring column and the tombstone put together live longer than GC grace seconds,
- // we'll fulfil our responsibility to repair. See discussion at
- // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
- return new DeletedCell(name, localExpirationTime - timeToLive, timestamp);
- }
-
- public int getTimeToLive()
- {
- return timeToLive;
- }
-
- @Override
- public Cell withUpdatedName(CellName newName)
- {
- return new ExpiringCell(newName, value, timestamp, timeToLive, localExpirationTime);
- }
-
- @Override
- public Cell withUpdatedTimestamp(long newTimestamp)
- {
- return new ExpiringCell(name, value, newTimestamp, timeToLive, localExpirationTime);
- }
-
- @Override
- public int dataSize()
- {
- return super.dataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);
- }
-
- @Override
- public int serializedSize(CellNameType type, TypeSizes typeSizes)
- {
- /*
- * An expired column adds to a Cell :
- * 4 bytes for the localExpirationTime
- * + 4 bytes for the timeToLive
- */
- return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
- }
-
- @Override
- public void updateDigest(MessageDigest digest)
- {
- digest.update(name.toByteBuffer().duplicate());
- digest.update(value.duplicate());
-
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- buffer.writeLong(timestamp);
- buffer.writeByte(serializationFlags());
- buffer.writeInt(timeToLive);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
-
- @Override
- public int getLocalDeletionTime()
- {
- return localExpirationTime;
- }
-
- @Override
- public Cell localCopy(AbstractAllocator allocator)
- {
- return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
- }
-
- @Override
- public String getString(CellNameType comparator)
- {
- return String.format("%s!%d", super.getString(comparator), timeToLive);
- }
-
- @Override
- public boolean isMarkedForDelete(long now)
- {
- return (int) (now / 1000) >= getLocalDeletionTime();
- }
-
- @Override
- public long getMarkedForDeleteAt()
- {
- return timestamp;
- }
-
- @Override
- public int serializationFlags()
- {
- return ColumnSerializer.EXPIRATION_MASK;
- }
-
- @Override
- public void validateFields(CFMetaData metadata) throws MarshalException
- {
- super.validateFields(metadata);
- if (timeToLive <= 0)
- throw new MarshalException("A column TTL should be > 0");
- if (localExpirationTime < 0)
- throw new MarshalException("The local expiration time should not be negative");
- }
+ public int getTimeToLive();
- @Override
- public boolean equals(Object o)
- {
- // super.equals() returns false if o is not a CounterCell
- return super.equals(o)
- && localExpirationTime == ((ExpiringCell)o).localExpirationTime
- && timeToLive == ((ExpiringCell)o).timeToLive;
- }
+ ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
- @Override
- public int hashCode()
- {
- int result = super.hashCode();
- result = 31 * result + localExpirationTime;
- result = 31 * result + timeToLive;
- return result;
- }
+ ExpiringCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 4415e63..8c892d6 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -514,7 +514,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
for (Row row : rows)
{
- UUID hostId = UUIDGen.getUUID(row.key.key);
+ UUID hostId = UUIDGen.getUUID(row.key.getKey());
InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
// token may have since been removed (in which case we have just read back a tombstone)
if (target != null)
@@ -573,7 +573,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
for (Row row : getHintsSlice(1))
{
if (row.cf != null) //ignore removed rows
- result.addFirst(tokenFactory.toString(row.key.token));
+ result.addFirst(tokenFactory.toString(row.key.getToken()));
}
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 31e68c1..17d1364 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -410,13 +410,13 @@ public class Keyspace
public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
{
if (logger.isDebugEnabled())
- logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
+ logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
{
Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
- Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
+ Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.getKey(), DEFAULT_PAGE_SIZE);
while (pager.hasNext())
{
ColumnFamily cf = pager.next();
@@ -426,7 +426,7 @@ public class Keyspace
if (cfs.indexManager.indexes(cell.name(), indexes))
cf2.addColumn(cell);
}
- cfs.indexManager.indexRow(key.key, cf2, opGroup);
+ cfs.indexManager.indexRow(key.getKey(), cf2, opGroup);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 4cf6654..6f4c1c7 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -46,19 +46,16 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.ContextAllocator;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-import org.apache.cassandra.utils.memory.Pool;
-import org.apache.cassandra.utils.memory.PoolAllocator;
+import org.apache.cassandra.utils.memory.*;
public class Memtable
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
- static final Pool memoryPool = DatabaseDescriptor.getMemtableAllocatorPool();
+ static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
private static final int ROW_OVERHEAD_HEAP_SIZE;
- private final PoolAllocator allocator;
+ private final MemtableAllocator allocator;
private final AtomicLong liveDataSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -85,12 +82,12 @@ public class Memtable
public Memtable(ColumnFamilyStore cfs)
{
this.cfs = cfs;
- this.allocator = memoryPool.newAllocator();
+ this.allocator = MEMORY_POOL.newAllocator();
this.initialComparator = cfs.metadata.comparator;
this.cfs.scheduleFlush();
}
- public PoolAllocator getAllocator()
+ public MemtableAllocator getAllocator()
{
return allocator;
}
@@ -177,7 +174,7 @@ public class Memtable
if (previous == null)
{
AtomicBTreeColumns empty = cf.cloneMeShallow(AtomicBTreeColumns.factory, false);
- final DecoratedKey cloneKey = new DecoratedKey(key.token, allocator.clone(key.key, opGroup));
+ final DecoratedKey cloneKey = allocator.clone(key, opGroup);
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
previous = rows.putIfAbsent(cloneKey, empty);
if (previous == null)
@@ -185,27 +182,17 @@ public class Memtable
previous = empty;
// allocate the row overhead after the fact; this saves over allocating and having to free after, but
// means we can overshoot our declared limit.
- int overhead = (int) (cfs.partitioner.getHeapSizeOf(key.token) + ROW_OVERHEAD_HEAP_SIZE);
- allocator.allocate(overhead, opGroup);
+ int overhead = (int) (cfs.partitioner.getHeapSizeOf(key.getToken()) + ROW_OVERHEAD_HEAP_SIZE);
+ allocator.onHeap().allocate(overhead, opGroup);
}
else
{
- allocator.free(cloneKey.key);
+ allocator.reclaimer().reclaimImmediately(cloneKey);
}
}
- ContextAllocator contextAllocator = allocator.wrap(opGroup);
- AtomicBTreeColumns.Delta delta = previous.addAllWithSizeDelta(cf, contextAllocator, indexer, new AtomicBTreeColumns.Delta());
- liveDataSize.addAndGet(delta.dataSize());
+ liveDataSize.addAndGet(previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer));
currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-
- // allocate or free the delta in column overhead after the fact
- for (Cell cell : delta.reclaimed())
- {
- cell.name.free(allocator);
- allocator.free(cell.value);
- }
- allocator.allocate((int) delta.excessHeapSize(), opGroup);
}
// for debugging
@@ -256,10 +243,10 @@ public class Memtable
Map.Entry<? extends RowPosition, ? extends ColumnFamily> entry = iter.next();
// Actual stored key should be true DecoratedKey
assert entry.getKey() instanceof DecoratedKey;
- if (memoryPool.needToCopyOnHeap())
+ if (MEMORY_POOL.needToCopyOnHeap())
{
DecoratedKey key = (DecoratedKey) entry.getKey();
- key = new DecoratedKey(key.token, HeapAllocator.instance.clone(key.key));
+ key = new BufferDecoratedKey(key.getToken(), HeapAllocator.instance.clone(key.getKey()));
ColumnFamily cells = ArrayBackedSortedColumns.localCopy(entry.getValue(), HeapAllocator.instance);
entry = new AbstractMap.SimpleImmutableEntry<>(key, cells);
}
@@ -307,7 +294,7 @@ public class Memtable
{
// make sure we don't write non-sensical keys
assert key instanceof DecoratedKey;
- keySize += ((DecoratedKey)key).key.remaining();
+ keySize += ((DecoratedKey)key).getKey().remaining();
}
estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
@@ -410,16 +397,20 @@ public class Memtable
static
{
// calculate row overhead
+ final OpOrder.Group group = new OpOrder().start();
int rowOverhead;
+ MemtableAllocator allocator = MEMORY_POOL.newAllocator();
ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>();
final int count = 100000;
final Object val = new Object();
for (int i = 0 ; i < count ; i++)
- rows.put(new DecoratedKey(new LongToken((long) i), ByteBufferUtil.EMPTY_BYTE_BUFFER), val);
+ rows.put(allocator.clone(new BufferDecoratedKey(new LongToken((long) i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
double avgSize = ObjectSizes.measureDeep(rows) / (double) count;
rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
rowOverhead -= ObjectSizes.measureDeep(new LongToken((long) 0));
- rowOverhead += AtomicBTreeColumns.HEAP_SIZE;
+ rowOverhead += AtomicBTreeColumns.EMPTY_SIZE;
+ allocator.setDiscarding();
+ allocator.setDiscarded();
ROW_OVERHEAD_HEAP_SIZE = rowOverhead;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6fae9b0..b64c675 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -64,7 +64,7 @@ public class Mutation implements IMutation
public Mutation(String keyspaceName, Row row)
{
- this(keyspaceName, row.key.key, row.cf);
+ this(keyspaceName, row.key.getKey(), row.cf);
}
protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/NativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCell.java b/src/java/org/apache/cassandra/db/NativeCell.java
new file mode 100644
index 0000000..1c8ebd9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeCell.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeCell extends AbstractNativeCell
+{
+ private static final long SIZE = ObjectSizes.measure(new NativeCell());
+
+ NativeCell()
+ {}
+
+ public NativeCell(NativeAllocator allocator, OpOrder.Group writeOp, Cell copyOf)
+ {
+ super(allocator, writeOp, copyOf);
+ }
+
+ @Override
+ public CellName name()
+ {
+ return this;
+ }
+
+ @Override
+ public long timestamp()
+ {
+ return getLong(TIMESTAMP_OFFSET);
+ }
+
+ @Override
+ public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+ {
+ return new BufferCell(copy(metadata, allocator), allocator.clone(value()), timestamp());
+ }
+
+ @Override
+ public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+ {
+ return allocator.clone(this, metadata, opGroup);
+ }
+
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ updateWithName(digest); // name
+ updateWithValue(digest); // value
+
+ FBUtilities.updateWithLong(digest, timestamp());
+ FBUtilities.updateWithByte(digest, serializationFlags());
+ }
+
+ @Override
+ public long excessHeapSizeExcludingData()
+ {
+ return SIZE;
+ }
+
+ @Override
+ public long unsharedHeapSize()
+ {
+ return SIZE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/NativeCounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCounterCell.java b/src/java/org/apache/cassandra/db/NativeCounterCell.java
new file mode 100644
index 0000000..abcf598
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeCounterCell.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeCounterCell extends NativeCell implements CounterCell
+{
+ private static final long SIZE = ObjectSizes.measure(new NativeCounterCell());
+
+ private NativeCounterCell()
+ {}
+
+ public NativeCounterCell(NativeAllocator allocator, OpOrder.Group writeOp, CounterCell copyOf)
+ {
+ super(allocator, writeOp, copyOf);
+ }
+
+ @Override
+ protected void construct(Cell from)
+ {
+ super.construct(from);
+ setLong(internalSize() - 8, ((CounterCell) from).timestampOfLastDelete());
+ }
+
+ @Override
+ protected int postfixSize()
+ {
+ return 8;
+ }
+
+ @Override
+ protected int sizeOf(Cell cell)
+ {
+ return 8 + super.sizeOf(cell);
+ }
+
+ @Override
+ public long timestampOfLastDelete()
+ {
+ return getLong(internalSize() - 8);
+ }
+
+ @Override
+ public long total()
+ {
+ return contextManager.total(value());
+ }
+
+ @Override
+ public boolean hasLegacyShards()
+ {
+ return contextManager.hasLegacyShards(value());
+ }
+
+ @Override
+ public Cell markLocalToBeCleared()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell diff(Cell cell)
+ {
+ return diff(this, cell);
+ }
+
+ @Override
+ public Cell reconcile(Cell cell)
+ {
+ return reconcile(this, cell);
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.COUNTER_MASK;
+ }
+
+ @Override
+ public int cellDataSize()
+ {
+ // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
+ return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete());
+ }
+
+ @Override
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
+ {
+ return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete());
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ validateName(metadata);
+ // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
+ // which is not the internal representation of counters
+ contextManager.validateContext(value());
+ }
+
+ /*
+ * We have to special case digest creation for counter column because
+ * we don't want to include the information about which shard of the
+ * context is a delta or not, since this information differs from node to
+ * node.
+ */
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ updateWithName(digest);
+
+ // We don't take the deltas into account in a digest
+ contextManager.updateDigest(digest, value());
+
+ FBUtilities.updateWithLong(digest, timestamp());
+ FBUtilities.updateWithByte(digest, serializationFlags());
+ FBUtilities.updateWithLong(digest, timestampOfLastDelete());
+ }
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ return String.format("%s(%s:false:%s@%d!%d)",
+ getClass().getSimpleName(),
+ comparator.getString(name()),
+ contextManager.toString(value()),
+ timestamp(),
+ timestampOfLastDelete());
+ }
+
+ @Override
+ public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+ {
+ return new BufferCounterCell(copy(metadata, allocator), allocator.clone(value()), timestamp(), timestampOfLastDelete());
+ }
+
+ @Override
+ public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+ {
+ return allocator.clone(this, metadata, opGroup);
+ }
+
+ @Override
+ public long excessHeapSizeExcludingData()
+ {
+ return SIZE;
+ }
+
+ @Override
+ public long unsharedHeapSize()
+ {
+ return SIZE;
+ }
+
+ public boolean equals(Cell cell)
+ {
+ return cell instanceof CounterCell && equals((CounterCell) this);
+ }
+
+ public boolean equals(CounterCell cell)
+ {
+ return super.equals(cell) && timestampOfLastDelete() == cell.timestampOfLastDelete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
new file mode 100644
index 0000000..52aa50c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeDecoratedKey extends DecoratedKey
+{
+ private final long peer;
+
+ public NativeDecoratedKey(Token token, NativeAllocator allocator, OpOrder.Group writeOp, ByteBuffer key)
+ {
+ super(token);
+ assert key != null;
+ int size = key.remaining();
+ this.peer = allocator.allocate(4 + size, writeOp);
+ MemoryUtil.setInt(peer, size);
+ MemoryUtil.setBytes(peer + 4, key);
+ }
+
+ public ByteBuffer getKey()
+ {
+ return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer));
+ }
+}