You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/16 20:08:03 UTC
[2/2] git commit: Add support for 2.1 global counter shards
Add support for 2.1 global counter shards
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6505
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/83cd80b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/83cd80b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/83cd80b2
Branch: refs/heads/cassandra-2.0
Commit: 83cd80b277fa6e73be1c7d66b80669731f0f92c0
Parents: 1d4caf4
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Jan 16 22:07:20 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Jan 16 22:07:20 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/CounterColumn.java | 125 +--
.../cassandra/db/CounterUpdateColumn.java | 4 +-
.../db/compaction/CompactionController.java | 6 -
.../db/compaction/LazilyCompactedRow.java | 6 +-
.../compaction/ParallelCompactionIterable.java | 2 +-
.../db/compaction/PrecompactedRow.java | 32 +-
.../cassandra/db/context/CounterContext.java | 765 +++++++++----------
.../io/sstable/AbstractSSTableSimpleWriter.java | 5 +-
.../cassandra/io/sstable/SSTableWriter.java | 2 +-
.../apache/cassandra/db/CounterColumnTest.java | 105 +--
.../cassandra/db/CounterMutationTest.java | 115 ---
.../db/context/CounterContextTest.java | 703 ++++++++---------
.../streaming/StreamingTransferTest.java | 19 +-
14 files changed, 789 insertions(+), 1101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac8c428..d994612 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
* Improve replica pinning for cache efficiency in DES (CASSANDRA-6485)
* Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
* Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
+ * Add support for 2.1 global counter shards (CASSANDRA-6505)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index b470c5a..28a2ba5 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -19,27 +19,18 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.net.InetAddress;
import java.security.MessageDigest;
-import java.util.Set;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.serializers.MarshalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.context.IContext.ContextRelationship;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.*;
/**
@@ -55,12 +46,12 @@ public class CounterColumn extends Column
public CounterColumn(ByteBuffer name, long value, long timestamp)
{
- this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
+ this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp);
}
public CounterColumn(ByteBuffer name, long value, long timestamp, long timestampOfLastDelete)
{
- this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
+ this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
}
public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp)
@@ -76,10 +67,8 @@ public class CounterColumn extends Column
public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
{
- // #elt being negative means we have to clean delta
- short count = value.getShort(value.position());
- if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && count < 0))
- value = CounterContext.instance().clearAllDelta(value);
+ if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value)))
+ value = contextManager.clearAllLocal(value);
return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
}
@@ -265,107 +254,9 @@ public class CounterColumn extends Column
return contextManager.hasCounterId(value(), id);
}
- private CounterColumn computeOldShardMerger(int mergeBefore)
- {
- ByteBuffer bb = contextManager.computeOldShardMerger(value(), CounterId.getOldLocalCounterIds(), mergeBefore);
- if (bb == null)
- return null;
- else
- return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
- }
-
- private CounterColumn removeOldShards(int gcBefore)
- {
- ByteBuffer bb = contextManager.removeOldShards(value(), gcBefore);
- if (bb == value())
- return this;
- else
- {
- return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
- }
- }
-
- public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
- {
- mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
- }
-
- /**
- * There is two phase to the removal of old shards.
- * First phase: we merge the old shard value to the current shard and
- * 'nulify' the old one. We then send the counter context with the old
- * shard nulified to all other replica.
- * Second phase: once an old shard has been nulified for longer than
- * gc_grace (to be sure all other replica had been aware of the merge), we
- * simply remove that old shard from the context (it's value is 0).
- * This method does both phases.
- * (Note that the sendToOtherReplica flag is here only to facilitate
- * testing. It should be true in real code so use the method above
- * preferably)
- */
- public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
- {
- ColumnFamily remoteMerger = null;
-
- for (Column c : cf)
- {
- if (!(c instanceof CounterColumn))
- continue;
- CounterColumn cc = (CounterColumn) c;
- CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
- CounterColumn merged = cc;
- if (shardMerger != null)
- {
- merged = (CounterColumn) cc.reconcile(shardMerger);
- if (remoteMerger == null)
- remoteMerger = cf.cloneMeShallow();
- remoteMerger.addColumn(merged);
- }
- CounterColumn cleaned = merged.removeOldShards(gcBefore);
- if (cleaned != cc)
- {
- cf.replace(cc, cleaned);
- }
- }
-
- if (remoteMerger != null && sendToOtherReplica)
- {
- try
- {
- sendToOtherReplica(key, remoteMerger);
- }
- catch (Exception e)
- {
- logger.error("Error while sending shard merger mutation to remote endpoints", e);
- }
- }
- }
-
- public Column markDeltaToBeCleared()
+ public Column markLocalToBeCleared()
{
- return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
- }
-
- private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
- {
- RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
-
- final InetAddress local = FBUtilities.getBroadcastAddress();
- String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
-
- StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
- {
- public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
- throws OverloadedException
- {
- // We should only send to the remote replica, not the local one
- Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
- // Fake local response to be a good lad but we won't wait on the responseHandler
- responseHandler.response(null);
- StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
- }
- }, null, WriteType.SIMPLE);
-
- // we don't wait for answers
+ ByteBuffer marked = contextManager.markLocalToBeCleared(value);
+ return marked == value ? this : new CounterColumn(name, marked, timestamp, timestampOfLastDelete);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 1ae7dd7..422beee 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -82,7 +82,7 @@ public class CounterUpdateColumn extends Column
public CounterColumn localCopy(ColumnFamilyStore cfs)
{
return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance),
- CounterContext.instance().create(delta(), HeapAllocator.instance),
+ CounterContext.instance().createLocal(delta(), HeapAllocator.instance),
timestamp(),
Long.MIN_VALUE);
}
@@ -91,7 +91,7 @@ public class CounterUpdateColumn extends Column
public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
return new CounterColumn(cfs.internOrCopy(name, allocator),
- CounterContext.instance().create(delta(), allocator),
+ CounterContext.instance().createLocal(delta(), allocator),
timestamp(),
Long.MIN_VALUE);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 7edc60e..fba659d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -49,7 +49,6 @@ public class CompactionController
private final Set<SSTableReader> compacting;
public final int gcBefore;
- public final int mergeShardBefore;
/**
* Constructor that subclasses may use when overriding shouldPurge to not need overlappingTree
@@ -65,11 +64,6 @@ public class CompactionController
this.cfs = cfs;
this.gcBefore = gcBefore;
this.compacting = compacting;
- // If we merge an old CounterId id, we must make sure that no further increment for that id are in an active memtable.
- // For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
- // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
- // current 'stop all write during memtable switch' situation).
- this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
Set<SSTableReader> overlapping = compacting == null ? null : cfs.getAndReferenceOverlappingSSTables(compacting);
this.overlappingSSTables = overlapping == null ? Collections.<SSTableReader>emptySet() : overlapping;
this.overlappingTree = overlapping == null ? null : DataTracker.buildIntervalTree(overlapping);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0ad3de2..998f8cc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -259,7 +259,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
{
// when we clear() the container, it removes the deletion info, so this needs to be reset each time
container.delete(maxRowTombstone);
- ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+ ColumnFamily purged = PrecompactedRow.removeDeleted(key, shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
@@ -268,8 +268,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
Column reduced = purged.iterator().next();
container.clear();
- // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
- // not the range tombstone. For that we use the columnIndexer tombstone tracker.
+ // PrecompactedRow.removeDeleted has only checked the top-level CF deletion times,
+ // not the range tombstones. For that we use the columnIndexer tombstone tracker.
if (indexBuilder.tombstoneTracker().isDeleted(reduced))
{
indexer.remove(reduced);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index fdddc4b..f1790d8 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -187,7 +187,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
}
PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key));
- return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF);
+ return PrecompactedRow.removeDeleted(rows.get(0).key, controller, returnCF);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index b225f53..d45bffa 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -44,14 +44,14 @@ public class PrecompactedRow extends AbstractCompactedRow
{
private final ColumnFamily compactedCf;
- /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
+ // it is caller's responsibility to call removeDeleted from the cf before calling this constructor
public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
{
super(key);
compactedCf = cf;
}
- public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
+ public static ColumnFamily removeDeleted(DecoratedKey key, CompactionController controller, ColumnFamily cf)
{
assert key != null;
assert controller != null;
@@ -59,7 +59,7 @@ public class PrecompactedRow extends AbstractCompactedRow
// avoid calling shouldPurge unless we actually need to: it can be very expensive if LCS
// gets behind and has hundreds of overlapping L0 sstables. Essentially, this method is an
- // ugly refactor of removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf),
+ // ugly refactor of removeDeleted(controller.shouldPurge(key), controller, cf),
// taking this into account.
Boolean shouldPurge = null;
@@ -69,34 +69,20 @@ public class PrecompactedRow extends AbstractCompactedRow
// We should only gc tombstone if shouldPurge == true. But otherwise,
// it is still ok to collect column that shadowed by their (deleted)
// container, which removeDeleted(cf, Integer.MAX_VALUE) will do
- ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
-
- if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
- {
- if (shouldPurge == null)
- shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp());
- if (shouldPurge)
- CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
- }
-
- return compacted;
+ return ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
}
- public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+ public static ColumnFamily removeDeleted(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
{
// See comment in preceding method
- ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
- shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
- controller.cfs.indexManager.updaterFor(key));
- if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
- CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
- return compacted;
+ return ColumnFamilyStore.removeDeleted(cf,
+ shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
+ controller.cfs.indexManager.updaterFor(key));
}
public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
{
- this(rows.get(0).getKey(),
- removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows, controller)));
+ this(rows.get(0).getKey(), removeDeleted(rows.get(0).getKey(), controller, merge(rows, controller)));
}
private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 634ee22..1fa4d60 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -20,27 +20,27 @@ package org.apache.cassandra.db.context;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import org.apache.cassandra.serializers.MarshalException;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.*;
/**
* An implementation of a partitioned counter context.
*
* A context is primarily a list of tuples (counter id, clock, count) -- called
- * shard in the following. But with some shard are flagged as delta (with
+ * shards, with some shards flagged as global or local (with
* special resolution rules in merge()).
*
* The data structure has two parts:
- * a) a header containing the lists of "delta" (a list of references to the second parts)
- * b) a list of shard -- (counter id, logical clock, count) tuples -- (the so-called 'body' below)
+ * a) a header containing the lists of global and local shard indexes in the body
+ * b) a list of shards -- (counter id, logical clock, count) tuples -- (the so-called 'body' below)
*
* The exact layout is:
* | header | body |
@@ -49,6 +49,9 @@ import org.apache.cassandra.utils.*;
* | list of indices in the body list (2*#elt bytes)
* #elt in rest of header (2 bytes)
*
+ * Non-negative indices refer to local shards. Global shard indices are encoded as [idx + Short.MIN_VALUE],
+ * and are thus always negative.
+ *
* The body layout being:
*
* body: |----|----|----|----|----|----|....
@@ -57,14 +60,17 @@ import org.apache.cassandra.utils.*;
* | clock_1 | clock_2
* counterid_1 counterid_2
*
- * The rules when merging two shard with the same counterid are:
- * - delta + delta = sum counts (and logical clock)
- * - delta + other = keep the delta one
- * - other + other = keep the shard with highest logical clock
+ * The rules when merging two shard with the same counter id are:
+ * - global + global = keep the shard with the highest logical clock
+ * - global + local = keep the global one
+ * - global + remote = keep the global one
+ * - local + local = sum counts (and logical clocks)
+ * - local + remote = keep the local one
+ * - remote + remote = keep the shard with the highest logical clock
*
- * For a detailed description of the meaning of a delta and why the merging
+ * For a detailed description of the meaning of a local and why the merging
* rules work this way, see CASSANDRA-1938 - specifically the 1938_discussion
- * attachment.
+ * attachment (doesn't cover global shards, see CASSANDRA-4775 for that).
*/
public class CounterContext implements IContext
{
@@ -88,45 +94,23 @@ public class CounterContext implements IContext
}
/**
- * Creates an initial counter context with an initial value for the local node.
- *
- *
- * @param value the value for this initial update
- *
- * @param allocator
- * @return an empty counter context.
+ * Creates a counter context with a single local shard.
*/
- public ByteBuffer create(long value, Allocator allocator)
+ public ByteBuffer createLocal(long count, Allocator allocator)
{
- ByteBuffer context = allocator.allocate(HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH + STEP_LENGTH);
- // The first (and only) elt is a delta
- context.putShort(context.position(), (short)1);
- context.putShort(context.position() + HEADER_SIZE_LENGTH, (short)0);
- writeElementAtOffset(context, context.position() + HEADER_SIZE_LENGTH + HEADER_ELT_LENGTH, CounterId.getLocalId(), 1L, value);
- return context;
+ ContextState state = ContextState.allocate(0, 1, 0, allocator);
+ state.writeLocal(CounterId.getLocalId(), 1L, count);
+ return state.context;
}
- // Provided for use by unit tests
- public ByteBuffer create(CounterId id, long clock, long value, boolean isDelta)
- {
- ByteBuffer context = ByteBuffer.allocate(HEADER_SIZE_LENGTH + (isDelta ? HEADER_ELT_LENGTH : 0) + STEP_LENGTH);
- context.putShort(context.position(), (short)(isDelta ? 1 : 0));
- if (isDelta)
- {
- context.putShort(context.position() + HEADER_SIZE_LENGTH, (short)0);
- }
- writeElementAtOffset(context, context.position() + HEADER_SIZE_LENGTH + (isDelta ? HEADER_ELT_LENGTH : 0), id, clock, value);
- return context;
- }
-
- // write a tuple (counter id, clock, count) at an absolute (bytebuffer-wise) offset
- private static void writeElementAtOffset(ByteBuffer context, int offset, CounterId id, long clock, long count)
+ /**
+ * Creates a counter context with a single remote shard.
+ */
+ public ByteBuffer createRemote(CounterId id, long clock, long count, Allocator allocator)
{
- context = context.duplicate();
- context.position(offset);
- context.put(id.bytes().duplicate());
- context.putLong(clock);
- context.putLong(count);
+ ContextState state = ContextState.allocate(0, 0, 1, allocator);
+ state.writeRemote(id, clock, count);
+ return state.context;
}
private static int headerLength(ByteBuffer context)
@@ -156,8 +140,8 @@ public class CounterContext implements IContext
public ContextRelationship diff(ByteBuffer left, ByteBuffer right)
{
ContextRelationship relationship = ContextRelationship.EQUAL;
- ContextState leftState = new ContextState(left, headerLength(left));
- ContextState rightState = new ContextState(right, headerLength(right));
+ ContextState leftState = ContextState.wrap(left);
+ ContextState rightState = ContextState.wrap(right);
while (leftState.hasRemaining() && rightState.hasRemaining())
{
@@ -298,107 +282,90 @@ public class CounterContext implements IContext
*/
public ByteBuffer merge(ByteBuffer left, ByteBuffer right, Allocator allocator)
{
- ContextState leftState = new ContextState(left, headerLength(left));
- ContextState rightState = new ContextState(right, headerLength(right));
+ int globalCount = 0;
+ int localCount = 0;
+ int remoteCount = 0;
- // Compute size of result
- int mergedHeaderLength = HEADER_SIZE_LENGTH;
- int mergedBodyLength = 0;
+ ContextState leftState = ContextState.wrap(left);
+ ContextState rightState = ContextState.wrap(right);
while (leftState.hasRemaining() && rightState.hasRemaining())
{
int cmp = leftState.compareIdTo(rightState);
if (cmp == 0)
{
- mergedBodyLength += STEP_LENGTH;
- if (leftState.isDelta() || rightState.isDelta())
- mergedHeaderLength += HEADER_ELT_LENGTH;
+ if (leftState.isGlobal() || rightState.isGlobal())
+ globalCount += 1;
+ else if (leftState.isLocal() || rightState.isLocal())
+ localCount += 1;
+ else
+ remoteCount += 1;
+
leftState.moveToNext();
rightState.moveToNext();
}
else if (cmp > 0)
{
- mergedBodyLength += STEP_LENGTH;
- if (rightState.isDelta())
- mergedHeaderLength += HEADER_ELT_LENGTH;
+ if (rightState.isGlobal())
+ globalCount += 1;
+ else if (rightState.isLocal())
+ localCount += 1;
+ else
+ remoteCount += 1;
+
rightState.moveToNext();
}
else // cmp < 0
{
- mergedBodyLength += STEP_LENGTH;
- if (leftState.isDelta())
- mergedHeaderLength += HEADER_ELT_LENGTH;
+ if (leftState.isGlobal())
+ globalCount += 1;
+ else if (leftState.isLocal())
+ localCount += 1;
+ else
+ remoteCount += 1;
+
leftState.moveToNext();
}
}
- mergedHeaderLength += leftState.remainingHeaderLength() + rightState.remainingHeaderLength();
- mergedBodyLength += leftState.remainingBodyLength() + rightState.remainingBodyLength();
- // Do the actual merge
- ByteBuffer merged = allocator.allocate(mergedHeaderLength + mergedBodyLength);
- merged.putShort(merged.position(), (short) ((mergedHeaderLength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
- ContextState mergedState = new ContextState(merged, mergedHeaderLength);
+ while (leftState.hasRemaining())
+ {
+ if (leftState.isGlobal())
+ globalCount += 1;
+ else if (leftState.isLocal())
+ localCount += 1;
+ else
+ remoteCount += 1;
+
+ leftState.moveToNext();
+ }
+
+ while (rightState.hasRemaining())
+ {
+ if (rightState.isGlobal())
+ globalCount += 1;
+ else if (rightState.isLocal())
+ localCount += 1;
+ else
+ remoteCount += 1;
+
+ rightState.moveToNext();
+ }
+
leftState.reset();
rightState.reset();
+
+ return merge(ContextState.allocate(globalCount, localCount, remoteCount, allocator), leftState, rightState);
+ }
+
+ private ByteBuffer merge(ContextState mergedState, ContextState leftState, ContextState rightState)
+ {
while (leftState.hasRemaining() && rightState.hasRemaining())
{
int cmp = leftState.compareIdTo(rightState);
if (cmp == 0)
{
- if (leftState.isDelta() || rightState.isDelta())
- {
- // Local id and at least one is a delta
- if (leftState.isDelta() && rightState.isDelta())
- {
- // both delta, sum
- long clock = leftState.getClock() + rightState.getClock();
- long count = leftState.getCount() + rightState.getCount();
- mergedState.writeElement(leftState.getCounterId(), clock, count, true);
- }
- else
- {
- // Only one have delta, keep that one
- (leftState.isDelta() ? leftState : rightState).copyTo(mergedState);
- }
- }
- else
- {
- long leftClock = leftState.getClock();
- long rightClock = rightState.getClock();
-
- if (leftClock == rightClock)
- {
- // We should never see non-delta shards w/ same id+clock but different counts. However, if we do
- // we should "heal" the problem by being deterministic in our selection of shard - and
- // log the occurrence so that the operator will know something is wrong.
- long leftCount = leftState.getCount();
- long rightCount = rightState.getCount();
-
- if (leftCount != rightCount && CompactionManager.isCompactionManager.get())
- {
- logger.warn("invalid counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in "
- + "count; will pick highest to self-heal on compaction",
- leftState.getCounterId(), leftClock, leftCount, rightState.getCounterId(), rightClock, rightCount);
- }
-
- if (leftCount > rightCount)
- {
- leftState.copyTo(mergedState);
- }
- else
- {
- rightState.copyTo(mergedState);
- }
- }
- else
- {
- if ((leftClock >= 0 && rightClock > 0 && leftClock >= rightClock)
- || (leftClock < 0 && (rightClock > 0 || leftClock < rightClock)))
- leftState.copyTo(mergedState);
- else
- rightState.copyTo(mergedState);
- }
- }
+ mergeTie(mergedState, leftState, rightState);
rightState.moveToNext();
leftState.moveToNext();
}
@@ -413,18 +380,110 @@ public class CounterContext implements IContext
leftState.moveToNext();
}
}
+
while (leftState.hasRemaining())
{
leftState.copyTo(mergedState);
leftState.moveToNext();
}
+
while (rightState.hasRemaining())
{
rightState.copyTo(mergedState);
rightState.moveToNext();
}
- return merged;
+ return mergedState.context;
+ }
+
+ private void mergeTie(ContextState mergedState, ContextState leftState, ContextState rightState)
+ {
+ if (leftState.isGlobal() || rightState.isGlobal())
+ {
+ if (leftState.isGlobal() && rightState.isGlobal())
+ {
+ long leftClock = leftState.getClock();
+ long rightClock = rightState.getClock();
+
+ if (leftClock == rightClock)
+ {
+ long leftCount = leftState.getCount();
+ long rightCount = rightState.getCount();
+
+ // Can happen if an sstable gets lost and disk failure policy is set to 'best effort'
+ if (leftCount != rightCount && CompactionManager.isCompactionManager.get())
+ {
+ logger.warn("invalid global counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in "
+ + "count; will pick highest to self-heal on compaction",
+ leftState.getCounterId(), leftClock, leftCount,
+ rightState.getCounterId(), rightClock, rightCount);
+ }
+
+ if (leftCount > rightCount)
+ leftState.copyTo(mergedState);
+ else
+ rightState.copyTo(mergedState);
+ }
+ else
+ {
+ (leftClock > rightClock ? leftState : rightState).copyTo(mergedState);
+ }
+ }
+ else // only one is global - keep that one
+ {
+ (leftState.isGlobal() ? leftState : rightState).copyTo(mergedState);
+ }
+ }
+ else if (leftState.isLocal() || rightState.isLocal())
+ {
+ // Local id and at least one is a local shard.
+ if (leftState.isLocal() && rightState.isLocal())
+ {
+ // both local - sum
+ long clock = leftState.getClock() + rightState.getClock();
+ long count = leftState.getCount() + rightState.getCount();
+ mergedState.writeLocal(leftState.getCounterId(), clock, count);
+ }
+ else // only one is local - keep that one
+ {
+ (leftState.isLocal() ? leftState : rightState).copyTo(mergedState);
+ }
+ }
+ else // both are remote shards
+ {
+ long leftClock = leftState.getClock();
+ long rightClock = rightState.getClock();
+
+ if (leftClock == rightClock)
+ {
+ // We should never see non-local shards w/ same id+clock but different counts. However, if we do
+ // we should "heal" the problem by being deterministic in our selection of shard - and
+ // log the occurrence so that the operator will know something is wrong.
+ long leftCount = leftState.getCount();
+ long rightCount = rightState.getCount();
+
+ if (leftCount != rightCount && CompactionManager.isCompactionManager.get())
+ {
+ logger.warn("invalid remote counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in "
+ + "count; will pick highest to self-heal on compaction",
+ leftState.getCounterId(), leftClock, leftCount,
+ rightState.getCounterId(), rightClock, rightCount);
+ }
+
+ if (leftCount > rightCount)
+ leftState.copyTo(mergedState);
+ else
+ rightState.copyTo(mergedState);
+ }
+ else
+ {
+ if ((leftClock >= 0 && rightClock > 0 && leftClock >= rightClock)
+ || (leftClock < 0 && (rightClock > 0 || leftClock < rightClock)))
+ leftState.copyTo(mergedState);
+ else
+ rightState.copyTo(mergedState);
+ }
+ }
}
/**
@@ -435,25 +494,23 @@ public class CounterContext implements IContext
*/
public String toString(ByteBuffer context)
{
- ContextState state = new ContextState(context, headerLength(context));
+ ContextState state = ContextState.wrap(context);
StringBuilder sb = new StringBuilder();
sb.append("[");
while (state.hasRemaining())
{
- if (state.elementIdx() > 0)
- {
+ if (state.getElementIndex() > 0)
sb.append(",");
- }
sb.append("{");
sb.append(state.getCounterId().toString()).append(", ");
- sb.append(state.getClock()).append(", ");;
+ sb.append(state.getClock()).append(", ");
sb.append(state.getCount());
sb.append("}");
- if (state.isDelta())
- {
+ if (state.isGlobal())
+ sb.append("$");
+ else if (state.isLocal())
sb.append("*");
- }
state.moveToNext();
}
@@ -481,49 +538,87 @@ public class CounterContext implements IContext
return total;
}
+ public boolean shouldClearLocal(ByteBuffer context)
+ {
+ // #elt being negative means we have to clean local shards.
+ return context.getShort(context.position()) < 0;
+ }
+
/**
- * Mark context to delete delta afterward.
+ * Mark context to delete local references afterward.
* Marking is done by multiply #elt by -1 to preserve header length
- * and #elt count in order to clear all delta later.
+ * and #elt count in order to clear all local refs later.
*
* @param context a counter context
- * @return context that marked to delete delta
+ * @return context that marked to delete local refs
*/
- public ByteBuffer markDeltaToBeCleared(ByteBuffer context)
+ public ByteBuffer markLocalToBeCleared(ByteBuffer context)
{
- int headerLength = headerLength(context);
- if (headerLength == HEADER_SIZE_LENGTH)
- return context;
-
- ByteBuffer marked = context.duplicate();
short count = context.getShort(context.position());
- // negate #elt to mark as deleted, without changing its size.
- if (count > 0)
- marked.putShort(marked.position(), (short) (count * -1));
+ if (count <= 0)
+ return context; // already marked or all are remote.
+
+ boolean hasLocalShards = false;
+ for (int i = 0; i < count; i++)
+ {
+ if (context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH) >= 0)
+ {
+ hasLocalShards = true;
+ break;
+ }
+ }
+
+ if (!hasLocalShards)
+ return context; // all shards are global or remote.
+
+ ByteBuffer marked = ByteBuffer.allocate(context.remaining());
+ marked.putShort(marked.position(), (short) (count * -1));
+ ByteBufferUtil.arrayCopy(context,
+ context.position() + HEADER_SIZE_LENGTH,
+ marked,
+ marked.position() + HEADER_SIZE_LENGTH,
+ context.remaining() - HEADER_SIZE_LENGTH);
return marked;
}
/**
- * Remove all the delta of a context (i.e, set an empty header).
+ * Remove all the local of a context (but keep global).
*
* @param context a counter context
- * @return a version of {@code context} where no count are a delta.
+ * @return a version of {@code context} where no shards are local.
*/
- public ByteBuffer clearAllDelta(ByteBuffer context)
+ public ByteBuffer clearAllLocal(ByteBuffer context)
{
- int headerLength = headerLength(context);
- if (headerLength == HEADER_SIZE_LENGTH)
- return context;
-
- ByteBuffer cleaned = ByteBuffer.allocate(context.remaining() - headerLength + HEADER_SIZE_LENGTH);
- cleaned.putShort(cleaned.position(), (short)0);
- ByteBufferUtil.arrayCopy(
- context,
- context.position() + headerLength,
- cleaned,
- cleaned.position() + HEADER_SIZE_LENGTH,
- context.remaining() - headerLength);
- return cleaned;
+ int count = Math.abs(context.getShort(context.position()));
+ if (count == 0)
+ return context; // no local or global shards present.
+
+ List<Short> globalShardIndexes = new ArrayList<>(count);
+ for (int i = 0; i < count; i++)
+ {
+ short elt = context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH);
+ if (elt < 0)
+ globalShardIndexes.add(elt);
+ }
+
+ if (count == globalShardIndexes.size())
+ return context; // no local shards detected.
+
+ // allocate a smaller BB for the cleared context - with no local header elts.
+ ByteBuffer cleared = ByteBuffer.allocate(context.remaining() - (count - globalShardIndexes.size()) * HEADER_ELT_LENGTH);
+
+ cleared.putShort(cleared.position(), (short) globalShardIndexes.size());
+ for (int i = 0; i < globalShardIndexes.size(); i++)
+ cleared.putShort(cleared.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH, globalShardIndexes.get(i));
+
+ int origHeaderLength = headerLength(context);
+ ByteBufferUtil.arrayCopy(context,
+ context.position() + origHeaderLength,
+ cleared,
+ cleared.position() + headerLength(cleared),
+ context.remaining() - origHeaderLength);
+
+ return cleared;
}
public void validateContext(ByteBuffer context) throws MarshalException
@@ -535,15 +630,14 @@ public class CounterContext implements IContext
/**
* Update a MessageDigest with the content of a context.
* Note that this skips the header entirely since the header information
- * has local meaning only, while digests a meant for comparison across
+ * has local meaning only, while digests are meant for comparison across
* nodes. This means in particular that we always have:
- * updateDigest(ctx) == updateDigest(clearAllDelta(ctx))
+ * updateDigest(ctx) == updateDigest(clearAllLocal(ctx))
*/
public void updateDigest(MessageDigest message, ByteBuffer context)
{
- int hlength = headerLength(context);
ByteBuffer dup = context.duplicate();
- dup.position(context.position() + hlength);
+ dup.position(context.position() + headerLength(context));
message.update(dup);
}
@@ -569,199 +663,6 @@ public class CounterContext implements IContext
}
/**
- * Compute a new context such that if applied to context yields the same
- * total but with old local counter ids nulified and there content merged to
- * the current localCounterId.
- */
- public ByteBuffer computeOldShardMerger(ByteBuffer context, List<CounterId.CounterIdRecord> oldIds, long mergeBefore)
- {
- long now = System.currentTimeMillis();
- int hlength = headerLength(context);
- CounterId localId = CounterId.getLocalId();
-
- Iterator<CounterId.CounterIdRecord> recordIterator = oldIds.iterator();
- CounterId.CounterIdRecord currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
-
- ContextState state = new ContextState(context, hlength);
-
- List<CounterId> toMerge = new ArrayList<CounterId>();
- long mergeTotal = 0;
- while (state.hasRemaining() && currRecord != null)
- {
- assert !currRecord.id.equals(localId);
-
- CounterId counterId = state.getCounterId();
- int c = counterId.compareTo(currRecord.id);
-
- if (c > 0)
- {
- currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
- continue;
- }
-
- if (state.isDelta())
- {
- if (state.getClock() < 0)
- {
- // Already merged shard, waiting to be collected
-
- if (counterId.equals(localId))
- // we should not get there, but we have been creating problematic context prior to #2968
- throw new RuntimeException("Current counterId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
-
- if (state.getCount() != 0)
- {
- // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
- logger.error(String.format("Invalid counter context (clock is %d and count is %d for CounterId %s), will fix", state.getCount(), state.getCount(), counterId.toString()));
- toMerge.add(counterId);
- mergeTotal += state.getCount();
- }
- }
- else if (c == 0)
- {
- // Found an old id. However, merging an oldId that has just been renewed isn't safe, so
- // we check that it has been renewed before mergeBefore.
- if (currRecord.timestamp < mergeBefore)
- {
- toMerge.add(counterId);
- mergeTotal += state.getCount();
- }
- }
- }
-
- if (c == 0)
- currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
-
- state.moveToNext();
- }
- // Continuing the iteration so that we can repair invalid shards
- while (state.hasRemaining())
- {
- CounterId counterId = state.getCounterId();
- if (state.isDelta() && state.getClock() < 0)
- {
- if (counterId.equals(localId))
- // we should not get there, but we have been creating problematic context prior to #2968
- throw new RuntimeException("Current counterId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
-
- if (state.getCount() != 0)
- {
- // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
- logger.error(String.format("Invalid counter context (clock is %d and count is %d for CounterId %s), will fix", state.getClock(), state.getCount(), counterId.toString()));
- toMerge.add(counterId);
- mergeTotal += state.getCount();
- }
- }
- state.moveToNext();
- }
-
- if (toMerge.isEmpty())
- return null;
-
- ContextState merger = ContextState.allocate(toMerge.size() + 1, toMerge.size() + 1);
- state.reset();
- int i = 0;
- int removedTotal = 0;
- boolean localWritten = false;
- while (state.hasRemaining())
- {
- CounterId counterId = state.getCounterId();
- if (counterId.compareTo(localId) > 0)
- {
- merger.writeElement(localId, 1L, mergeTotal, true);
- localWritten = true;
- }
- else if (i < toMerge.size() && counterId.compareTo(toMerge.get(i)) == 0)
- {
- long count = state.getCount();
- removedTotal += count;
- merger.writeElement(counterId, -now - state.getClock(), -count, true);
- ++i;
- }
- state.moveToNext();
- }
- if (!localWritten)
- merger.writeElement(localId, 1L, mergeTotal, true);
-
- // sanity check
- assert mergeTotal == removedTotal;
- return merger.context;
- }
-
- /**
- * Remove shards that have been canceled through computeOldShardMerger
- * since a time older than gcBefore.
- * Used by compaction to strip context of unecessary information,
- * shrinking them.
- */
- public ByteBuffer removeOldShards(ByteBuffer context, int gcBefore)
- {
- int hlength = headerLength(context);
- ContextState state = new ContextState(context, hlength);
- int removedShards = 0;
- int removedDelta = 0;
- while (state.hasRemaining())
- {
- long clock = state.getClock();
- if (clock < 0)
- {
- // We should never have a count != 0 when clock < 0.
- // We know that previous may have created those situation though, so:
- // * for delta shard: we throw an exception since computeOldShardMerger should
- // have corrected that situation
- // * for non-delta shard: it is a much more crappier situation because there is
- // not much we can do since we are not responsible for that shard. So we simply
- // ignore the shard.
- if (state.getCount() != 0)
- {
- if (state.isDelta())
- {
- throw new IllegalStateException("Counter shard with negative clock but count != 0; context = " + toString(context));
- }
- else
- {
- logger.debug("Ignoring non-removable non-delta corrupted shard in context " + toString(context));
- state.moveToNext();
- continue;
- }
- }
-
- if (-((int)(clock / 1000)) < gcBefore)
- {
- removedShards++;
- if (state.isDelta())
- removedDelta++;
- }
- }
- state.moveToNext();
- }
-
- if (removedShards == 0)
- return context;
-
- int removedHeaderSize = removedDelta * HEADER_ELT_LENGTH;
- int removedBodySize = removedShards * STEP_LENGTH;
- int newSize = context.remaining() - removedHeaderSize - removedBodySize;
- int newHlength = hlength - removedHeaderSize;
- ByteBuffer cleanedContext = HeapAllocator.instance.allocate(newSize);
- cleanedContext.putShort(cleanedContext.position(), (short) ((newHlength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
- ContextState cleaned = new ContextState(cleanedContext, newHlength);
-
- state.reset();
- while (state.hasRemaining())
- {
- long clock = state.getClock();
- if (clock >= 0 || state.getCount() != 0 || -((int)(clock / 1000)) >= gcBefore)
- {
- state.copyTo(cleaned);
- }
-
- state.moveToNext();
- }
- return cleanedContext;
- }
-
- /**
* Helper class to work on contexts (works by iterating over them).
* A context being abstractly a list of tuple (counterid, clock, count), a
* ContextState encapsulate a context and a position to one of the tuple.
@@ -775,74 +676,97 @@ public class CounterContext implements IContext
{
public final ByteBuffer context;
public final int headerLength;
- private int headerOffset; // offset from context.position()
- private int bodyOffset; // offset from context.position()
- private boolean currentIsDelta;
- public ContextState(ByteBuffer context, int headerLength)
+ private int headerOffset; // offset from context.position()
+ private int bodyOffset; // offset from context.position()
+ private boolean currentIsGlobal;
+ private boolean currentIsLocal;
+
+ private ContextState(ByteBuffer context)
{
- this(context, headerLength, HEADER_SIZE_LENGTH, headerLength, false);
- updateIsDelta();
+ this.context = context;
+ this.headerLength = this.bodyOffset = headerLength(context);
+ this.headerOffset = HEADER_SIZE_LENGTH;
+ updateIsGlobalOrLocal();
}
- public ContextState(ByteBuffer context)
+ public static ContextState wrap(ByteBuffer context)
{
- this(context, headerLength(context));
+ return new ContextState(context);
}
- private ContextState(ByteBuffer context, int headerLength, int headerOffset, int bodyOffset, boolean currentIsDelta)
+ /**
+ * Allocate a new context big enough for globalCount + localCount + remoteCount elements
+ * and return the initial corresponding ContextState.
+ */
+ public static ContextState allocate(int globalCount, int localCount, int remoteCount, Allocator allocator)
{
- this.context = context;
- this.headerLength = headerLength;
- this.headerOffset = headerOffset;
- this.bodyOffset = bodyOffset;
- this.currentIsDelta = currentIsDelta;
+ int headerLength = HEADER_SIZE_LENGTH + (globalCount + localCount) * HEADER_ELT_LENGTH;
+ int bodyLength = (globalCount + localCount + remoteCount) * STEP_LENGTH;
+
+ ByteBuffer buffer = allocator.allocate(headerLength + bodyLength);
+ buffer.putShort(buffer.position(), (short) (globalCount + localCount));
+
+ return ContextState.wrap(buffer);
}
- public boolean isDelta()
+ public boolean isGlobal()
{
- return currentIsDelta;
+ return currentIsGlobal;
}
- private void updateIsDelta()
+ public boolean isLocal()
{
- currentIsDelta = (headerOffset < headerLength) && context.getShort(context.position() + headerOffset) == (short) elementIdx();
+ return currentIsLocal;
}
- public boolean hasRemaining()
+ public boolean isRemote()
{
- return bodyOffset < context.remaining();
+ return !(currentIsGlobal || currentIsLocal);
}
- public int remainingHeaderLength()
+ private void updateIsGlobalOrLocal()
{
- return headerLength - headerOffset;
+ if (headerOffset >= headerLength)
+ {
+ currentIsGlobal = currentIsLocal = false;
+ }
+ else
+ {
+ short headerElt = context.getShort(context.position() + headerOffset);
+ currentIsGlobal = headerElt == getElementIndex() + Short.MIN_VALUE;
+ currentIsLocal = headerElt == getElementIndex();
+ }
}
- public int remainingBodyLength()
+ public boolean hasRemaining()
{
- return context.remaining() - bodyOffset;
+ return bodyOffset < context.remaining();
}
public void moveToNext()
{
bodyOffset += STEP_LENGTH;
- if (currentIsDelta)
- {
+ if (currentIsGlobal || currentIsLocal)
headerOffset += HEADER_ELT_LENGTH;
- }
- updateIsDelta();
+ updateIsGlobalOrLocal();
}
- // This advance other to the next position (but not this)
public void copyTo(ContextState other)
{
- ByteBufferUtil.arrayCopy(context, context.position() + bodyOffset, other.context, other.context.position() + other.bodyOffset, STEP_LENGTH);
- if (currentIsDelta)
- {
- other.context.putShort(other.context.position() + other.headerOffset, (short) other.elementIdx());
- }
- other.currentIsDelta = currentIsDelta;
+ ByteBufferUtil.arrayCopy(context,
+ context.position() + bodyOffset,
+ other.context,
+ other.context.position() + other.bodyOffset,
+ STEP_LENGTH);
+
+ if (currentIsGlobal)
+ other.context.putShort(other.context.position() + other.headerOffset, (short) (other.getElementIndex() + Short.MIN_VALUE));
+ else if (currentIsLocal)
+ context.putShort(other.context.position() + other.headerOffset, (short) other.getElementIndex());
+
+ other.currentIsGlobal = currentIsGlobal;
+ other.currentIsLocal = currentIsLocal;
other.moveToNext();
}
@@ -855,7 +779,12 @@ public class CounterContext implements IContext
{
this.headerOffset = HEADER_SIZE_LENGTH;
this.bodyOffset = headerLength;
- updateIsDelta();
+ updateIsGlobalOrLocal();
+ }
+
+ public int getElementIndex()
+ {
+ return (bodyOffset - headerLength) / STEP_LENGTH;
}
public CounterId getCounterId()
@@ -873,50 +802,44 @@ public class CounterContext implements IContext
return context.getLong(context.position() + bodyOffset + CounterId.LENGTH + CLOCK_LENGTH);
}
- // Advance this to the next position
- public void writeElement(CounterId id, long clock, long count, boolean isDelta)
+ // In 2.0 only used by the unit tests.
+ public void writeGlobal(CounterId id, long clock, long count)
{
- writeElementAtOffset(context, context.position() + bodyOffset, id, clock, count);
- if (isDelta)
- {
- context.putShort(context.position() + headerOffset, (short)elementIdx());
- }
- currentIsDelta = isDelta;
- moveToNext();
+ writeElement(id, clock, count, true, false);
}
- public void writeElement(CounterId id, long clock, long count)
+ public void writeLocal(CounterId id, long clock, long count)
{
- writeElement(id, clock, count, false);
+ writeElement(id, clock, count, false, true);
}
- public int elementIdx()
+ public void writeRemote(CounterId id, long clock, long count)
{
- return (bodyOffset - headerLength) / STEP_LENGTH;
+ writeElement(id, clock, count, false, false);
}
- public ContextState duplicate()
+ private void writeElement(CounterId id, long clock, long count, boolean isGlobal, boolean isLocal)
{
- return new ContextState(context, headerLength, headerOffset, bodyOffset, currentIsDelta);
- }
+ writeElementAtOffset(context, context.position() + bodyOffset, id, clock, count);
- /*
- * Allocate a new context big enough for {@code elementCount} elements
- * with {@code deltaCount} of them being delta, and return the initial
- * ContextState corresponding.
- */
- public static ContextState allocate(int elementCount, int deltaCount)
- {
- return allocate(elementCount, deltaCount, HeapAllocator.instance);
+ if (isGlobal)
+ context.putShort(context.position() + headerOffset, (short) (getElementIndex() + Short.MIN_VALUE));
+ else if (isLocal)
+ context.putShort(context.position() + headerOffset, (short) getElementIndex());
+
+ currentIsGlobal = isGlobal;
+ currentIsLocal = isLocal;
+ moveToNext();
}
- public static ContextState allocate(int elementCount, int deltaCount, Allocator allocator)
+ // write a tuple (counter id, clock, count) at an absolute (bytebuffer-wise) offset
+ private void writeElementAtOffset(ByteBuffer ctx, int offset, CounterId id, long clock, long count)
{
- assert deltaCount <= elementCount;
- int hlength = HEADER_SIZE_LENGTH + deltaCount * HEADER_ELT_LENGTH;
- ByteBuffer context = allocator.allocate(hlength + elementCount * STEP_LENGTH);
- context.putShort(context.position(), (short)deltaCount);
- return new ContextState(context, hlength);
+ ctx = ctx.duplicate();
+ ctx.position(offset);
+ ctx.put(id.bytes().duplicate());
+ ctx.putLong(clock);
+ ctx.putLong(count);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 0059fda..e337185 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.CounterId;
+import org.apache.cassandra.utils.HeapAllocator;
import org.apache.cassandra.utils.Pair;
public abstract class AbstractSSTableSimpleWriter
@@ -154,7 +155,9 @@ public abstract class AbstractSSTableSimpleWriter
*/
public void addCounterColumn(ByteBuffer name, long value)
{
- addColumn(new CounterColumn(name, CounterContext.instance().create(counterid, 1L, value, false), System.currentTimeMillis()));
+ addColumn(new CounterColumn(name,
+ CounterContext.instance().createRemote(counterid, 1L, value, HeapAllocator.instance),
+ System.currentTimeMillis()));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 3d19d83..81b3c27 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -246,7 +246,7 @@ public class SSTableWriter extends SSTable
if (atom == null)
break;
if (atom instanceof CounterColumn)
- atom = ((CounterColumn) atom).markDeltaToBeCleared();
+ atom = ((CounterColumn) atom).markLocalToBeCleared();
int deletionTime = atom.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/db/CounterColumnTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterColumnTest.java b/test/unit/org/apache/cassandra/db/CounterColumnTest.java
index 7c2be9f..1f2c078 100644
--- a/test/unit/org/apache/cassandra/db/CounterColumnTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterColumnTest.java
@@ -32,10 +32,11 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.context.CounterContext;
-import static org.apache.cassandra.db.context.CounterContext.ContextState;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.*;
+import static org.apache.cassandra.db.context.CounterContext.ContextState;
+
public class CounterColumnTest extends SchemaLoader
{
private static final CounterContext cc = new CounterContext();
@@ -66,8 +67,8 @@ public class CounterColumnTest extends SchemaLoader
assert 1 == column.value().getShort(0);
assert 0 == column.value().getShort(2);
assert CounterId.wrap(column.value(), 4).isLocalId();
- assert 1L == column.value().getLong(4 + 0*stepLength + idLength);
- assert delta == column.value().getLong(4 + 0*stepLength + idLength + clockLength);
+ assert 1L == column.value().getLong(4 + idLength);
+ assert delta == column.value().getLong(4 + idLength + clockLength);
}
@Test
@@ -79,6 +80,8 @@ public class CounterColumnTest extends SchemaLoader
ByteBuffer context;
+ Allocator allocator = HeapAllocator.instance;
+
// tombstone + tombstone
left = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L);
right = new DeletedColumn(ByteBufferUtil.bytes("x"), 2, 2L);
@@ -143,20 +146,20 @@ public class CounterColumnTest extends SchemaLoader
assert ((CounterColumn)reconciled).timestampOfLastDelete() == right.getMarkedForDeleteAt();
// live < live last delete
- left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 2L, 3L, false), 1L, Long.MIN_VALUE);
- right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 1L, 1L, false), 4L, 3L);
+ left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L, allocator), 1L, Long.MIN_VALUE);
+ right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L, allocator), 4L, 3L);
assert left.reconcile(right) == right;
// live last delete > live
- left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 2L, 3L, false), 6L, 5L);
- right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 1L, 1L, false), 4L, 3L);
+ left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L, allocator), 6L, 5L);
+ right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L, allocator), 4L, 3L);
assert left.reconcile(right) == left;
// live + live
- left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 1L, 1L, false), 4L, Long.MIN_VALUE);
- right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(1), 2L, 3L, false), 1L, Long.MIN_VALUE);
+ left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L, allocator), 4L, Long.MIN_VALUE);
+ right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L, allocator), 1L, Long.MIN_VALUE);
reconciled = left.reconcile(right);
assert reconciled.name().equals(left.name());
@@ -164,7 +167,7 @@ public class CounterColumnTest extends SchemaLoader
assert reconciled.timestamp() == 4L;
left = reconciled;
- right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(2), 1L, 5L, false), 2L, Long.MIN_VALUE);
+ right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(2), 1L, 5L, allocator), 2L, Long.MIN_VALUE);
reconciled = left.reconcile(right);
assert reconciled.name().equals(left.name());
@@ -172,7 +175,7 @@ public class CounterColumnTest extends SchemaLoader
assert reconciled.timestamp() == 4L;
left = reconciled;
- right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(CounterId.fromInt(2), 2L, 2L, false), 6L, Long.MIN_VALUE);
+ right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.createRemote(CounterId.fromInt(2), 2L, 2L, allocator), 6L, Long.MIN_VALUE);
reconciled = left.reconcile(right);
assert reconciled.name().equals(left.name());
@@ -183,13 +186,13 @@ public class CounterColumnTest extends SchemaLoader
int hd = 2; // header
assert hd + 2 * stepLength == context.remaining();
- assert Util.equalsCounterId(CounterId.fromInt(1), context, hd + 0 * stepLength);
- assert 2L == context.getLong(hd + 0*stepLength + idLength);
- assert 3L == context.getLong(hd + 0*stepLength + idLength + clockLength);
+ assert Util.equalsCounterId(CounterId.fromInt(1), context, hd);
+ assert 2L == context.getLong(hd + idLength);
+ assert 3L == context.getLong(hd + idLength + clockLength);
- assert Util.equalsCounterId(CounterId.fromInt(2), context, hd + 1 * stepLength);
- assert 2L == context.getLong(hd + 1*stepLength + idLength);
- assert 2L == context.getLong(hd + 1*stepLength + idLength + clockLength);
+ assert Util.equalsCounterId(CounterId.fromInt(2), context, hd + stepLength);
+ assert 2L == context.getLong(hd + stepLength + idLength);
+ assert 2L == context.getLong(hd + stepLength + idLength + clockLength);
assert ((CounterColumn)reconciled).timestampOfLastDelete() == Long.MIN_VALUE;
}
@@ -219,27 +222,27 @@ public class CounterColumnTest extends SchemaLoader
assert null == rightCol.diff(leftCol);
// equality: equal nodes, all counts same
- left = ContextState.allocate(3, 0, allocator);
- left.writeElement(CounterId.fromInt(3), 3L, 0L);
- left.writeElement(CounterId.fromInt(6), 2L, 0L);
- left.writeElement(CounterId.fromInt(9), 1L, 0L);
- right = new ContextState(ByteBufferUtil.clone(left.context), 2);
+ left = ContextState.allocate(0, 0, 3, allocator);
+ left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+ left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+ left.writeRemote(CounterId.fromInt(9), 1L, 0L);
+ right = ContextState.wrap(ByteBufferUtil.clone(left.context));
leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), left.context, 1L);
rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right.context, 1L);
assert leftCol.diff(rightCol) == null;
// greater than: left has superset of nodes (counts equal)
- left = ContextState.allocate(4, 0, allocator);
- left.writeElement(CounterId.fromInt(3), 3L, 0L);
- left.writeElement(CounterId.fromInt(6), 2L, 0L);
- left.writeElement(CounterId.fromInt(9), 1L, 0L);
- left.writeElement(CounterId.fromInt(12), 0L, 0L);
+ left = ContextState.allocate(0, 0, 4, allocator);
+ left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+ left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+ left.writeRemote(CounterId.fromInt(9), 1L, 0L);
+ left.writeRemote(CounterId.fromInt(12), 0L, 0L);
- right = ContextState.allocate(3, 0, allocator);
- right.writeElement(CounterId.fromInt(3), 3L, 0L);
- right.writeElement(CounterId.fromInt(6), 2L, 0L);
- right.writeElement(CounterId.fromInt(9), 1L, 0L);
+ right = ContextState.allocate(0, 0, 3, allocator);
+ right.writeRemote(CounterId.fromInt(3), 3L, 0L);
+ right.writeRemote(CounterId.fromInt(6), 2L, 0L);
+ right.writeRemote(CounterId.fromInt(9), 1L, 0L);
leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), left.context, 1L);
rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right.context, 1L);
@@ -249,15 +252,15 @@ public class CounterColumnTest extends SchemaLoader
assert leftCol == rightCol.diff(leftCol);
// disjoint: right and left have disjoint node sets
- left = ContextState.allocate(3, 0, allocator);
- left.writeElement(CounterId.fromInt(3), 1L, 0L);
- left.writeElement(CounterId.fromInt(4), 1L, 0L);
- left.writeElement(CounterId.fromInt(9), 1L, 0L);
+ left = ContextState.allocate(0, 0, 3, allocator);
+ left.writeRemote(CounterId.fromInt(3), 1L, 0L);
+ left.writeRemote(CounterId.fromInt(4), 1L, 0L);
+ left.writeRemote(CounterId.fromInt(9), 1L, 0L);
- right = ContextState.allocate(3, 0, allocator);
- right.writeElement(CounterId.fromInt(3), 1L, 0L);
- right.writeElement(CounterId.fromInt(6), 1L, 0L);
- right.writeElement(CounterId.fromInt(9), 1L, 0L);
+ right = ContextState.allocate(0, 0, 3, allocator);
+ right.writeRemote(CounterId.fromInt(3), 1L, 0L);
+ right.writeRemote(CounterId.fromInt(6), 1L, 0L);
+ right.writeRemote(CounterId.fromInt(9), 1L, 0L);
leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), left.context, 1L);
rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right.context, 1L);
@@ -269,11 +272,11 @@ public class CounterColumnTest extends SchemaLoader
public void testSerializeDeserialize() throws IOException
{
Allocator allocator = HeapAllocator.instance;
- CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 2, allocator);
- state.writeElement(CounterId.fromInt(1), 4L, 4L);
- state.writeElement(CounterId.fromInt(2), 4L, 4L, true);
- state.writeElement(CounterId.fromInt(3), 4L, 4L);
- state.writeElement(CounterId.fromInt(4), 4L, 4L, true);
+ CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 2, 2, allocator);
+ state.writeRemote(CounterId.fromInt(1), 4L, 4L);
+ state.writeLocal(CounterId.fromInt(2), 4L, 4L);
+ state.writeRemote(CounterId.fromInt(3), 4L, 4L);
+ state.writeLocal(CounterId.fromInt(4), 4L, 4L);
CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), state.context, 1L);
DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -288,7 +291,7 @@ public class CounterColumnTest extends SchemaLoader
CounterColumn deserializedOnRemote = (CounterColumn) Column.serializer.deserialize(new DataInputStream(bufIn), ColumnSerializer.Flag.FROM_REMOTE);
assert deserializedOnRemote.name().equals(original.name());
assert deserializedOnRemote.total() == original.total();
- assert deserializedOnRemote.value().equals(cc.clearAllDelta(original.value()));
+ assert deserializedOnRemote.value().equals(cc.clearAllLocal(original.value()));
assert deserializedOnRemote.timestamp() == deserialized.timestamp();
assert deserializedOnRemote.timestampOfLastDelete() == deserialized.timestampOfLastDelete();
}
@@ -300,14 +303,14 @@ public class CounterColumnTest extends SchemaLoader
MessageDigest digest1 = MessageDigest.getInstance("md5");
MessageDigest digest2 = MessageDigest.getInstance("md5");
- CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 2, allocator);
- state.writeElement(CounterId.fromInt(1), 4L, 4L);
- state.writeElement(CounterId.fromInt(2), 4L, 4L, true);
- state.writeElement(CounterId.fromInt(3), 4L, 4L);
- state.writeElement(CounterId.fromInt(4), 4L, 4L, true);
+ CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 2, 2, allocator);
+ state.writeRemote(CounterId.fromInt(1), 4L, 4L);
+ state.writeLocal(CounterId.fromInt(2), 4L, 4L);
+ state.writeRemote(CounterId.fromInt(3), 4L, 4L);
+ state.writeLocal(CounterId.fromInt(4), 4L, 4L);
CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), state.context, 1L);
- CounterColumn cleared = new CounterColumn(ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 1L);
+ CounterColumn cleared = new CounterColumn(ByteBufferUtil.bytes("x"), cc.clearAllLocal(state.context), 1L);
original.updateDigest(digest1);
cleared.updateDigest(digest2);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/db/CounterMutationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
index 3cbd030..389b7b7 100644
--- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
@@ -18,87 +18,16 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.List;
import org.junit.Test;
-import static org.junit.Assert.fail;
-import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.utils.*;
-import org.apache.cassandra.Util;
-import static org.apache.cassandra.db.context.CounterContext.ContextState;
public class CounterMutationTest extends SchemaLoader
{
@Test
- public void testMergeOldShards() throws IOException
- {
- RowMutation rm;
- CounterMutation cm;
-
- CounterId id1 = CounterId.getLocalId();
-
- rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
- rm.addCounter("Counter1", ByteBufferUtil.bytes("Column1"), 3);
- cm = new CounterMutation(rm, ConsistencyLevel.ONE);
- cm.apply();
-
- CounterId.renewLocalId(2L); // faking time of renewal for test
- CounterId id2 = CounterId.getLocalId();
-
- rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
- rm.addCounter("Counter1", ByteBufferUtil.bytes("Column1"), 4);
- cm = new CounterMutation(rm, ConsistencyLevel.ONE);
- cm.apply();
-
- CounterId.renewLocalId(4L); // faking time of renewal for test
- CounterId id3 = CounterId.getLocalId();
-
- rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
- rm.addCounter("Counter1", ByteBufferUtil.bytes("Column1"), 5);
- rm.addCounter("Counter1", ByteBufferUtil.bytes("Column2"), 1);
- cm = new CounterMutation(rm, ConsistencyLevel.ONE);
- cm.apply();
-
- DecoratedKey dk = Util.dk("key1");
- ColumnFamily cf = Util.getColumnFamily(Keyspace.open("Keyspace1"), dk, "Counter1");
-
- // First merges old shards
- CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MIN_VALUE, Integer.MAX_VALUE, false);
- long now = System.currentTimeMillis();
- Column c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
- assert c != null;
- assert c instanceof CounterColumn;
- assert ((CounterColumn)c).total() == 12L;
- ContextState s = new ContextState(c.value());
- assert s.getCounterId().equals(id1);
- assert s.getCount() == 0L;
- assert -s.getClock() > now - 1000 : " >";
- assert -s.getClock() <= now;
- s.moveToNext();
- assert s.getCounterId().equals(id2);
- assert s.getCount() == 0L;
- assert -s.getClock() > now - 1000;
- assert -s.getClock() <= now;
- s.moveToNext();
- assert s.getCounterId().equals(id3);
- assert s.getCount() == 12L;
-
- // Then collect old shards
- CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MAX_VALUE, Integer.MIN_VALUE, false);
- c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
- assert c != null;
- assert c instanceof CounterColumn;
- assert ((CounterColumn)c).total() == 12L;
- s = new ContextState(c.value());
- assert s.getCounterId().equals(id3);
- assert s.getCount() == 12L;
- }
-
- @Test
public void testGetOldShardFromSystemKeyspace() throws IOException
{
// Renewing a bunch of times and checking we get the same thing from
@@ -112,49 +41,5 @@ public class CounterMutationTest extends SchemaLoader
assert inMem.equals(onDisk);
}
-
- @Test
- public void testRemoveOldShardFixCorrupted() throws IOException
- {
- CounterContext ctx = CounterContext.instance();
- int now = (int) (System.currentTimeMillis() / 1000);
-
- // Check that corrupted context created prior to #2968 are fixed by removeOldShards
- CounterId id1 = CounterId.getLocalId();
- CounterId.renewLocalId();
- CounterId id2 = CounterId.getLocalId();
-
- ContextState state = ContextState.allocate(3, 2);
- state.writeElement(CounterId.fromInt(1), 1, 4, false);
- state.writeElement(id1, 3, 2, true);
- state.writeElement(id2, -100, 5, true); // corrupted!
-
- assert ctx.total(state.context) == 11;
-
- try
- {
- ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<CounterId.CounterIdRecord>emptyList(), 0);
- ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
- fail("RemoveOldShards should throw an exception if the current id is non-sensical");
- }
- catch (RuntimeException e) {}
-
- CounterId.renewLocalId();
- ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<CounterId.CounterIdRecord>emptyList(), 0);
- ByteBuffer cleaned = ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
- assert ctx.total(cleaned) == 11;
-
- // Check it is not corrupted anymore
- ContextState state2 = new ContextState(cleaned);
- while (state2.hasRemaining())
- {
- assert state2.getClock() >= 0 || state2.getCount() == 0;
- state2.moveToNext();
- }
-
- // Check that if we merge old and clean on another node, we keep the right count
- ByteBuffer onRemote = ctx.merge(ctx.clearAllDelta(state.context), ctx.clearAllDelta(cleaned), HeapAllocator.instance);
- assert ctx.total(onRemote) == 11;
- }
}