You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/07 14:36:29 UTC
svn commit: r1198732 - in /cassandra/trunk: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cass...
Author: slebresne
Date: Mon Nov 7 13:36:28 2011
New Revision: 1198732
URL: http://svn.apache.org/viewvc?rev=1198732&view=rev
Log:
merge from 1.0.
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 7 13:36:28 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1196956
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Nov 7 13:36:28 2011
@@ -3,6 +3,8 @@
* EACH_QUORUM is only supported for writes (CASSANDRA-3272)
* replace compactionlock use in schema migration by checking CFS.isInvalidD
* recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445)
+Merged from 0.8:
+ * Make counter shard merging thread safe (CASSANDRA-3178)
1.0.2
* "defragment" rows for name-based queries under STCS (CASSANDRA-2503)
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 7 13:36:28 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1196956
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/contrib:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 7 13:36:28 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1196956
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 7 13:36:28 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1196956
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 7 13:36:28 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1196956
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 7 13:36:28 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1196956
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 7 13:36:28 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1196956
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1196958,1197123,1197130,1197417,1197420,1197426,1197494,1197771,1198726,1198731
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Mon Nov 7 13:36:28 2011
@@ -119,6 +119,8 @@ public final class CFMetaData
private int rowCacheSavePeriodInSeconds; // default 0 (off)
private int keyCacheSavePeriodInSeconds; // default 3600 (1 hour)
private int rowCacheKeysToSave; // default max int (aka feature is off)
+ // mergeShardsChance is now obsolete, but left here so as to not break
+ // thrift compatibility
private double mergeShardsChance; // default 0.1, chance [0.0, 1.0] of merging old shards during replication
private IRowCacheProvider rowCacheProvider;
private ByteBuffer keyAlias; // default NULL
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Nov 7 13:36:28 2011
@@ -1904,4 +1904,16 @@ public class ColumnFamilyStore implement
this.memtables = memtables;
}
}
+
+ /**
+ * Returns the creation time of the oldest memtable not fully flushed yet.
+ */
+ public long oldestUnflushedMemtable()
+ {
+ DataTracker.View view = data.getView();
+ long oldest = view.memtable.creationTime();
+ for (Memtable memtable : view.memtablesPendingFlush)
+ oldest = Math.min(oldest, memtable.creationTime());
+ return oldest;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Mon Nov 7 13:36:28 2011
@@ -20,20 +20,29 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.net.InetAddress;
import java.security.MessageDigest;
+import java.util.concurrent.TimeoutException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Collection;
+import com.google.common.collect.Multimap;
import org.apache.log4j.Logger;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.context.IContext.ContextRelationship;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-import org.apache.cassandra.utils.NodeId;
+import org.apache.cassandra.service.IWriteResponseHandler;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.*;
/**
* A column that represents a partitioned counter.
@@ -236,9 +245,9 @@ public class CounterColumn extends Colum
return contextManager.hasNodeId(value(), id);
}
- public CounterColumn computeOldShardMerger()
+ private CounterColumn computeOldShardMerger(int mergeBefore)
{
- ByteBuffer bb = contextManager.computeOldShardMerger(value(), NodeId.getOldLocalNodeIds());
+ ByteBuffer bb = contextManager.computeOldShardMerger(value(), NodeId.getOldLocalNodeIds(), mergeBefore);
if (bb == null)
return null;
else
@@ -256,17 +265,48 @@ public class CounterColumn extends Colum
}
}
- public static void removeOldShards(ColumnFamily cf, int gcBefore)
+ public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
{
+ mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
+ }
+
+ /**
+ * There is two phase to the removal of old shards.
+ * First phase: we merge the old shard value to the current shard and
+ * 'nulify' the old one. We then send the counter context with the old
+ * shard nulified to all other replica.
+ * Second phase: once an old shard has been nulified for longer than
+ * gc_grace (to be sure all other replica had been aware of the merge), we
+ * simply remove that old shard from the context (it's value is 0).
+ * This method does both phases.
+ * (Note that the sendToOtherReplica flag is here only to facilitate
+ * testing. It should be true in real code so use the method above
+ * preferably)
+ */
+ public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
+ {
+ ColumnFamily remoteMerger = null;
if (!cf.isSuper())
{
for (IColumn c : cf)
{
if (!(c instanceof CounterColumn))
continue;
- CounterColumn cleaned = ((CounterColumn) c).removeOldShards(gcBefore);
- if (cleaned != c)
- cf.replace(c, cleaned);
+ CounterColumn cc = (CounterColumn) c;
+ CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
+ CounterColumn merged = cc;
+ if (shardMerger != null)
+ {
+ merged = (CounterColumn) cc.reconcile(shardMerger);
+ if (remoteMerger == null)
+ remoteMerger = cf.cloneMeShallow();
+ remoteMerger.addColumn(merged);
+ }
+ CounterColumn cleaned = merged.removeOldShards(gcBefore);
+ if (cleaned != cc)
+ {
+ cf.replace(cc, cleaned);
+ }
}
}
else
@@ -278,16 +318,61 @@ public class CounterColumn extends Colum
{
if (!(subColumn instanceof CounterColumn))
continue;
- CounterColumn cleaned = ((CounterColumn) subColumn).removeOldShards(gcBefore);
+ CounterColumn cc = (CounterColumn) subColumn;
+ CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
+ CounterColumn merged = cc;
+ if (shardMerger != null)
+ {
+ merged = (CounterColumn) cc.reconcile(shardMerger);
+ if (remoteMerger == null)
+ remoteMerger = cf.cloneMeShallow();
+ remoteMerger.addColumn(c.name(), merged);
+ }
+ CounterColumn cleaned = merged.removeOldShards(gcBefore);
if (cleaned != subColumn)
c.replace(subColumn, cleaned);
}
}
}
+
+ if (remoteMerger != null && sendToOtherReplica)
+ {
+ try
+ {
+ sendToOtherReplica(key, remoteMerger);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error while sending shard merger mutation to remote endpoints", e);
+ }
+ }
}
public IColumn markDeltaToBeCleared()
{
return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
}
+
+ private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws UnavailableException, TimeoutException, IOException
+ {
+ RowMutation rm = new RowMutation(cf.metadata().ksName, key.key);
+ rm.add(cf);
+
+ final InetAddress local = FBUtilities.getBroadcastAddress();
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
+
+ StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
+ {
+ public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException
+ {
+ // We should only send to the remote replica, not the local one
+ targets.remove(local);
+ // Fake local response to be a good lad but we won't wait on the responseHandler
+ responseHandler.response(null);
+ StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
+ }
+ });
+
+ // we don't wait for answers
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Mon Nov 7 13:36:28 2011
@@ -100,7 +100,6 @@ public class CounterMutation implements
if (row == null || row.cf == null)
continue;
- row = mergeOldShards(readCommand.table, row);
ColumnFamily cf = row.cf;
if (cf.isSuper())
cf.retainAll(rowMutation.getColumnFamily(cf.metadata().cfId));
@@ -115,73 +114,6 @@ public class CounterMutation implements
commands.add(new SliceByNamesReadCommand(table, key, queryPath, columnFamily.getColumnNames()));
}
- private Row mergeOldShards(String table, Row row) throws IOException
- {
- ColumnFamily cf = row.cf;
- // random check for merging to allow lessening the performance impact
- if (cf.metadata().getMergeShardsChance() > FBUtilities.threadLocalRandom().nextDouble())
- {
- ColumnFamily merger = computeShardMerger(cf);
- if (merger != null)
- {
- RowMutation localMutation = new RowMutation(table, row.key.key);
- localMutation.add(merger);
- localMutation.apply();
-
- cf.addAll(merger, HeapAllocator.instance);
- }
- }
- return row;
- }
-
- private ColumnFamily computeShardMerger(ColumnFamily cf)
- {
- ColumnFamily merger = null;
-
- // CF type: regular
- if (!cf.isSuper())
- {
- for (IColumn column : cf)
- {
- if (!(column instanceof CounterColumn))
- continue;
- IColumn c = ((CounterColumn)column).computeOldShardMerger();
- if (c != null)
- {
- if (merger == null)
- merger = cf.cloneMeShallow();
- merger.addColumn(c);
- }
- }
- }
- else // CF type: super
- {
- for (IColumn superColumn : cf)
- {
- IColumn mergerSuper = null;
- for (IColumn column : superColumn.getSubColumns())
- {
- if (!(column instanceof CounterColumn))
- continue;
- IColumn c = ((CounterColumn)column).computeOldShardMerger();
- if (c != null)
- {
- if (mergerSuper == null)
- mergerSuper = ((SuperColumn)superColumn).cloneMeShallow();
- mergerSuper.addColumn(c);
- }
- }
- if (mergerSuper != null)
- {
- if (merger == null)
- merger = cf.cloneMeShallow();
- merger.addColumn(mergerSuper);
- }
- }
- }
- return merger;
- }
-
public Message makeMutationMessage(int version) throws IOException
{
byte[] bytes = FBUtilities.serialize(this, serializer, version);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Nov 7 13:36:28 2011
@@ -76,12 +76,14 @@ public class Memtable
private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
public final ColumnFamilyStore cfs;
+ private final long creationTime;
private SlabAllocator allocator = new SlabAllocator();
public Memtable(ColumnFamilyStore cfs)
{
this.cfs = cfs;
+ this.creationTime = System.currentTimeMillis();
Callable<Set<Object>> provider = new Callable<Set<Object>>()
{
@@ -397,4 +399,9 @@ public class Memtable
{
columnFamilies.clear();
}
+
+ public long creationTime()
+ {
+ return creationTime;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java Mon Nov 7 13:36:28 2011
@@ -45,6 +45,7 @@ public class CompactionController
public final int gcBefore;
public boolean keyExistenceIsExpensive;
+ public final int mergeShardBefore;
public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
{
@@ -52,6 +53,11 @@ public class CompactionController
this.cfs = cfs;
this.sstables = new HashSet<SSTableReader>(sstables);
this.gcBefore = gcBefore;
+ // If we merge an old NodeId id, we must make sure that no further increment for that id are in an active memtable.
+ // For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We
+ // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
+ // current 'stop all write during memtable switch' situation).
+ this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
this.forceDeserialize = forceDeserialize;
keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(this.sstables);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Mon Nov 7 13:36:28 2011
@@ -222,7 +222,7 @@ public class LazilyCompactedRow extends
protected IColumn getReduced()
{
- ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(shouldPurge, controller, container);
+ ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Mon Nov 7 13:36:28 2011
@@ -83,17 +83,17 @@ public class PrecompactedRow extends Abs
if (shouldPurge == null)
shouldPurge = controller.shouldPurge(key);
if (shouldPurge)
- CounterColumn.removeOldShards(compacted, controller.gcBefore);
+ CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
}
return compacted;
}
- public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
{
ColumnFamily compacted = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) : cf;
if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
- CounterColumn.removeOldShards(compacted, controller.gcBefore);
+ CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
return compacted;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java Mon Nov 7 13:36:28 2011
@@ -71,10 +71,6 @@ public class CounterContext implements I
private static final Logger logger = Logger.getLogger(CounterContext.class);
- // Time in ms since a node id has been renewed before we consider using it
- // during a merge
- private static final long MIN_MERGE_DELAY = 5 * 60 * 1000; // should be aplenty
-
// lazy-load singleton
private static class LazyHolder
{
@@ -531,84 +527,123 @@ public class CounterContext implements I
/**
* Compute a new context such that if applied to context yields the same
- * total but with the older local node id merged into the second to older one
- * (excluding current local node id) if need be.
+ * total but with old local node ids nulified and there content merged to
+ * the current localNodeId.
*/
- public ByteBuffer computeOldShardMerger(ByteBuffer context, List<NodeId.NodeIdRecord> oldIds)
+ public ByteBuffer computeOldShardMerger(ByteBuffer context, List<NodeId.NodeIdRecord> oldIds, long mergeBefore)
{
long now = System.currentTimeMillis();
int hlength = headerLength(context);
-
- // Don't bother if we know we can't find what we are looking for
- if (oldIds.size() < 2
- || now - oldIds.get(0).timestamp < MIN_MERGE_DELAY
- || now - oldIds.get(1).timestamp < MIN_MERGE_DELAY
- || context.remaining() - hlength < 2 * STEP_LENGTH)
- return null;
+ NodeId localId = NodeId.getLocalId();
Iterator<NodeId.NodeIdRecord> recordIterator = oldIds.iterator();
- NodeId.NodeIdRecord currRecord = recordIterator.next();
+ NodeId.NodeIdRecord currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
ContextState state = new ContextState(context, hlength);
ContextState foundState = null;
+ List<NodeId> toMerge = new ArrayList<NodeId>();
+ long mergeTotal = 0;
while (state.hasRemaining() && currRecord != null)
{
- if (now - currRecord.timestamp < MIN_MERGE_DELAY)
- return context;
+ assert !currRecord.id.equals(localId);
- assert !currRecord.id.equals(NodeId.getLocalId());
+ NodeId nodeId = state.getNodeId();
+ int c = nodeId.compareTo(currRecord.id);
- int c = state.getNodeId().compareTo(currRecord.id);
- if (c == 0)
+ if (c > 0)
+ {
+ currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
+ continue;
+ }
+
+ if (state.isDelta())
{
- if (foundState == null)
+ if (state.getClock() < 0)
{
- // We found a canditate for being merged
- if (state.getClock() < 0)
- return null;
-
- foundState = state.duplicate();
- currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
- state.moveToNext();
+ // Already merged shard, waiting to be collected
+
+ if (nodeId.equals(localId))
+ // we should not get there, but we have been creating problematic context prior to #2968
+ throw new RuntimeException("Current nodeId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
+
+ if (state.getCount() != 0)
+ {
+ // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
+ logger.error(String.format("Invalid counter context (clock is %d and count is %d for NodeId %s), will fix", state.getCount(), state.getCount(), nodeId.toString()));
+ toMerge.add(nodeId);
+ mergeTotal += state.getCount();
+ }
}
- else
+ else if (c == 0)
{
- assert !foundState.getNodeId().equals(state.getNodeId());
-
- // Found someone to merge it to
- int nbDelta = foundState.isDelta() ? 1 : 0;
- nbDelta += state.isDelta() ? 1 : 0;
- ContextState merger = ContextState.allocate(2, nbDelta, HeapAllocator.instance);
-
- long fclock = foundState.getClock();
- long fcount = foundState.getCount();
- long clock = state.getClock();
- long count = state.getCount();
+ // Found an old id. However, merging an oldId that has just been renewed isn't safe, so
+ // we check that it has been renewed before mergeBefore.
+ if (currRecord.timestamp < mergeBefore)
+ {
+ toMerge.add(nodeId);
+ mergeTotal += state.getCount();
+ }
+ }
+ }
- if (foundState.isDelta())
- merger.writeElement(foundState.getNodeId(), -now - fclock, -fcount, true);
- else
- merger.writeElement(foundState.getNodeId(), -now, 0);
+ if (c == 0)
+ currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
- if (state.isDelta())
- merger.writeElement(state.getNodeId(), fclock + clock, fcount, true);
- else
- merger.writeElement(state.getNodeId(), fclock + clock, fcount + count);
+ state.moveToNext();
+ }
+ // Continuing the iteration so that we can repair invalid shards
+ while (state.hasRemaining())
+ {
+ NodeId nodeId = state.getNodeId();
+ if (state.isDelta() && state.getClock() < 0)
+ {
+ if (nodeId.equals(localId))
+ // we should not get there, but we have been creating problematic context prior to #2968
+ throw new RuntimeException("Current nodeId with a negative clock (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
- return merger.context;
+ if (state.getCount() != 0)
+ {
+ // This should not happen, but previous bugs have generated this (#2968 in particular) so fixing it.
+ logger.error(String.format("Invalid counter context (clock is %d and count is %d for NodeId %s), will fix", state.getClock(), state.getCount(), nodeId.toString()));
+ toMerge.add(nodeId);
+ mergeTotal += state.getCount();
}
}
- else if (c < 0) // nodeid < record
+ state.moveToNext();
+ }
+
+ if (toMerge.isEmpty())
+ return null;
+
+ ContextState merger = ContextState.allocate(toMerge.size() + 1, toMerge.size() + 1);
+ state.reset();
+ int i = 0;
+ int removedTotal = 0;
+ boolean localWritten = false;
+ while (state.hasRemaining())
+ {
+ NodeId nodeId = state.getNodeId();
+ if (nodeId.compareTo(localId) > 0)
{
- state.moveToNext();
+ merger.writeElement(localId, 1L, mergeTotal, true);
+ localWritten = true;
}
- else // c > 0, nodeid > record
+ else if (i < toMerge.size() && nodeId.compareTo(toMerge.get(i)) == 0)
{
- currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
+ long count = state.getCount();
+ removedTotal += count;
+ merger.writeElement(nodeId, -now - state.getClock(), -count, true);
+ ++i;
}
+ state.moveToNext();
}
- return null;
+ if (!localWritten)
+ merger.writeElement(localId, 1L, mergeTotal, true);
+
+ // sanity check
+ assert mergeTotal == removedTotal;
+ return merger.context;
}
/**
@@ -621,59 +656,61 @@ public class CounterContext implements I
{
int hlength = headerLength(context);
ContextState state = new ContextState(context, hlength);
- int removedBodySize = 0, removedHeaderSize = 0;
- boolean forceFixing = false;
+ int removedShards = 0;
while (state.hasRemaining())
{
long clock = state.getClock();
- if (clock < 0 && -((int)(clock / 1000)) < gcBefore && (state.getCount() == 0 || !state.isDelta()))
- {
- removedBodySize += STEP_LENGTH;
- if (state.isDelta())
- removedHeaderSize += HEADER_ELT_LENGTH;
- }
- else if (clock < 0 && state.getCount() != 0 && state.isDelta())
+ if (clock < 0)
{
- forceFixing = true;
+ // We should never have a count != 0 when clock < 0.
+ // We know that previous may have created those situation though, so:
+ // * for delta shard: we throw an exception since computeOldShardMerger should
+ // have corrected that situation
+ // * for non-delta shard: it is a much more crappier situation because there is
+ // not much we can do since we are not responsible for that shard. So we simply
+ // ignore the shard.
+ if (state.getCount() != 0)
+ {
+ if (state.isDelta())
+ {
+ throw new IllegalStateException("Counter shard with negative clock but count != 0; context = " + toString(context));
+ }
+ else
+ {
+ logger.debug("Ignoring non-removable non-delta corrupted shard in context " + toString(context));
+ state.moveToNext();
+ continue;
+ }
+ }
+
+ if (-((int)(clock / 1000)) < gcBefore)
+ removedShards++;
}
state.moveToNext();
}
- if (removedBodySize == 0 && !forceFixing)
+ if (removedShards == 0)
return context;
- int newSize = context.remaining() - removedHeaderSize - removedBodySize;
+
+ int removedHeaderSize = removedShards * HEADER_ELT_LENGTH;
+ int newSize = context.remaining() - removedHeaderSize - (removedShards * STEP_LENGTH);
int newHlength = hlength - removedHeaderSize;
- ByteBuffer cleanedContext = ByteBuffer.allocate(newSize);
+ ByteBuffer cleanedContext = HeapAllocator.instance.allocate(newSize);
cleanedContext.putShort(cleanedContext.position(), (short) ((newHlength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH));
ContextState cleaned = new ContextState(cleanedContext, newHlength);
state.reset();
- long toAddBack = 0;
while (state.hasRemaining())
{
long clock = state.getClock();
- if (!(clock < 0 && -((int)(clock / 1000)) < gcBefore && (state.getCount() == 0 || !state.isDelta())))
+ if (!(clock < 0 && state.getCount() == 0))
{
- if (clock < 0 && state.getCount() != 0 && state.isDelta())
- {
- // we should not get there, but we have been creating problematic context prior to #2968
- if (state.getNodeId().equals(NodeId.getLocalId()))
- throw new RuntimeException("Merged counter shard with a count != 0 (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
-
- // we will "fix" it, but log a message
- logger.info("Collectable old shard with a count != 0. Will fix.");
- cleaned.writeElement(state.getNodeId(), clock - 1L, 0, true);
- toAddBack += state.getCount();
- }
- else
- {
- state.copyTo(cleaned);
- }
+ state.copyTo(cleaned);
}
state.moveToNext();
}
- return toAddBack == 0 ? cleanedContext : merge(cleanedContext, create(toAddBack, HeapAllocator.instance), HeapAllocator.instance);
+ return cleanedContext;
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Nov 7 13:36:28 2011
@@ -273,7 +273,7 @@ public class StorageProxy implements Sto
*
* @throws TimeoutException if the hints cannot be written/enqueued
*/
- private static void sendToHintedEndpoints(final RowMutation rm,
+ public static void sendToHintedEndpoints(final RowMutation rm,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
@@ -1226,7 +1226,7 @@ public class StorageProxy implements Sto
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
- private interface WritePerformer
+ public interface WritePerformer
{
public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException;
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterMutationTest.java Mon Nov 7 13:36:28 2011
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import org.junit.Test;
@@ -29,6 +30,7 @@ import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.Util;
import static org.apache.cassandra.db.context.CounterContext.ContextState;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -40,13 +42,15 @@ public class CounterMutationTest extends
RowMutation rm;
CounterMutation cm;
+ NodeId id1 = NodeId.getLocalId();
+
rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
rm.addCounter(new QueryPath("Counter1", null, ByteBufferUtil.bytes("Column1")), 3);
cm = new CounterMutation(rm, ConsistencyLevel.ONE);
cm.apply();
NodeId.renewLocalId(2L); // faking time of renewal for test
- NodeId id1 = NodeId.getLocalId();
+ NodeId id2 = NodeId.getLocalId();
rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
rm.addCounter(new QueryPath("Counter1", null, ByteBufferUtil.bytes("Column1")), 4);
@@ -54,7 +58,7 @@ public class CounterMutationTest extends
cm.apply();
NodeId.renewLocalId(4L); // faking time of renewal for test
- NodeId id2 = NodeId.getLocalId();
+ NodeId id3 = NodeId.getLocalId();
rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
rm.addCounter(new QueryPath("Counter1", null, ByteBufferUtil.bytes("Column1")), 5);
@@ -62,20 +66,39 @@ public class CounterMutationTest extends
cm = new CounterMutation(rm, ConsistencyLevel.ONE);
cm.apply();
- RowMutation reprm = cm.makeReplicationMutation();
- ColumnFamily cf = reprm.getColumnFamilies().iterator().next();
- CounterColumn.removeOldShards(cf, Integer.MAX_VALUE);
+ DecoratedKey dk = Util.dk("key1");
+ ColumnFamily cf = Util.getColumnFamily(Table.open("Keyspace1"), dk, "Counter1");
+
+ // First merges old shards
+ CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MIN_VALUE, Integer.MAX_VALUE, false);
+ long now = System.currentTimeMillis();
IColumn c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
assert c != null;
assert c instanceof CounterColumn;
-
assert ((CounterColumn)c).total() == 12L;
ContextState s = new ContextState(c.value());
assert s.getNodeId().equals(id1);
- assert s.getCount() == 7;
+ assert s.getCount() == 0L;
+ assert -s.getClock() > now - 1000 : " >";
+ assert -s.getClock() <= now;
s.moveToNext();
assert s.getNodeId().equals(id2);
- assert s.getCount() == 5;
+ assert s.getCount() == 0L;
+ assert -s.getClock() > now - 1000;
+ assert -s.getClock() <= now;
+ s.moveToNext();
+ assert s.getNodeId().equals(id3);
+ assert s.getCount() == 12L;
+
+ // Then collect old shards
+ CounterColumn.mergeAndRemoveOldShards(dk, cf, Integer.MAX_VALUE, Integer.MIN_VALUE, false);
+ c = cf.getColumn(ByteBufferUtil.bytes("Column1"));
+ assert c != null;
+ assert c instanceof CounterColumn;
+ assert ((CounterColumn)c).total() == 12L;
+ s = new ContextState(c.value());
+ assert s.getNodeId().equals(id3);
+ assert s.getCount() == 12L;
}
@Test
@@ -131,6 +154,7 @@ public class CounterMutationTest extends
public void testRemoveOldShardFixCorrupted() throws IOException
{
CounterContext ctx = CounterContext.instance();
+ int now = (int) (System.currentTimeMillis() / 1000);
// Check that corrupted context created prior to #2968 are fixed by removeOldShards
NodeId id1 = NodeId.getLocalId();
@@ -140,19 +164,21 @@ public class CounterMutationTest extends
ContextState state = ContextState.allocate(3, 2);
state.writeElement(NodeId.fromInt(1), 1, 4, false);
state.writeElement(id1, 3, 2, true);
- state.writeElement(id2, -System.currentTimeMillis(), 5, true); // corrupted!
+ state.writeElement(id2, -100, 5, true); // corrupted!
assert ctx.total(state.context) == 11;
try
{
- ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+ ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<NodeId.NodeIdRecord>emptyList(), 0);
+ ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
fail("RemoveOldShards should throw an exception if the current id is non-sensical");
}
catch (RuntimeException e) {}
NodeId.renewLocalId();
- ByteBuffer cleaned = ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+ ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<NodeId.NodeIdRecord>emptyList(), 0);
+ ByteBuffer cleaned = ctx.removeOldShards(ctx.merge(state.context, merger, HeapAllocator.instance), now);
assert ctx.total(cleaned) == 11;
// Check it is not corrupted anymore
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java?rev=1198732&r1=1198731&r2=1198732&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java Mon Nov 7 13:36:28 2011
@@ -362,54 +362,31 @@ public class CounterContextTest
records.add(new NodeId.NodeIdRecord(id1, 2L));
records.add(new NodeId.NodeIdRecord(id3, 4L));
- // Destination of merge is a delta
- ContextState ctx = ContextState.allocate(5, 2, allocator);
- ctx.writeElement(id1, 1L, 1L);
+ ContextState ctx = ContextState.allocate(5, 3, allocator);
+ ctx.writeElement(id1, 1L, 1L, true);
ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
ctx.writeElement(id3, 3L, 3L, true);
ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
- ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+ ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE);
+
ContextState m = new ContextState(merger);
assert m.getNodeId().equals(id1);
assert m.getClock() <= -now;
- assert m.getCount() == 0;
+ assert m.getCount() == -1L;
+ assert m.isDelta();
m.moveToNext();
assert m.getNodeId().equals(id3);
- assert m.getClock() == 4L;
- assert m.getCount() == 1L;
- assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
-
- // Source of merge is a delta
- ctx = ContextState.allocate(4, 1, allocator);
- ctx.writeElement(id1, 1L, 1L, true);
- ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
- ctx.writeElement(id3, 3L, 3L);
- ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
- merger = cc.computeOldShardMerger(ctx.context, records);
- assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
-
- // source and destination of merge are deltas
- ctx = ContextState.allocate(4, 2, allocator);
- ctx.writeElement(id1, 1L, 1L, true);
- ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
- ctx.writeElement(id3, 3L, 3L, true);
- ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
- merger = cc.computeOldShardMerger(ctx.context, records);
- assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
-
- // none of source and destination of merge are deltas
- ctx = ContextState.allocate(4, 0, allocator);
- ctx.writeElement(id1, 1L, 1L);
- ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
- ctx.writeElement(id3, 3L, 3L);
- ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
- merger = cc.computeOldShardMerger(ctx.context, records);
+ assert m.getClock() <= -now;
+ assert m.getCount() == -3L;
+ assert m.isDelta();
+ m.moveToNext();
+ assert m.getNodeId().equals(NodeId.getLocalId());
+ assert m.getClock() == 1L;
+ assert m.getCount() == 4L;
+ assert m.isDelta();
assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator));
}
@@ -430,28 +407,20 @@ public class CounterContextTest
records.add(new NodeId.NodeIdRecord(id3, 4L));
records.add(new NodeId.NodeIdRecord(id6, 10L));
- ContextState ctx = ContextState.allocate(6, 2, allocator);
- ctx.writeElement(id1, 1L, 1L);
+ ContextState ctx = ContextState.allocate(6, 3, allocator);
+ ctx.writeElement(id1, 1L, 1L, true);
ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
ctx.writeElement(id3, 3L, 3L, true);
ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
ctx.writeElement(id6, 5L, 6L);
- ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+ ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE);
ByteBuffer merged = cc.merge(ctx.context, merger, allocator);
assert cc.total(ctx.context) == cc.total(merged);
ByteBuffer cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis() / 1000) + 1);
assert cc.total(ctx.context) == cc.total(cleaned);
- assert cleaned.remaining() == ctx.context.remaining() - stepLength;
-
- merger = cc.computeOldShardMerger(cleaned, records);
- merged = cc.merge(cleaned, merger, allocator);
- assert cc.total(ctx.context) == cc.total(merged);
-
- cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis() / 1000) + 1);
- assert cc.total(ctx.context) == cc.total(cleaned);
- assert cleaned.remaining() == ctx.context.remaining() - 2 * stepLength - 2;
+ assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2;
}
}