You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/18 02:15:51 UTC
[4/4] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
build.xml
debian/changelog
src/java/org/apache/cassandra/db/BatchlogManager.java
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
src/java/org/apache/cassandra/db/HintedHandOffManager.java
src/java/org/apache/cassandra/db/SystemKeyspace.java
src/java/org/apache/cassandra/service/StorageProxy.java
test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66af6fed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66af6fed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66af6fed
Branch: refs/heads/cassandra-2.1
Commit: 66af6fedc02eed630028043f8a6f0d3014f193d5
Parents: de8a479 384de4b
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Apr 18 03:14:47 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Apr 18 03:14:47 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 11 +-
.../apache/cassandra/db/BatchlogManager.java | 102 +++++++++++--------
.../apache/cassandra/db/ColumnFamilyStore.java | 6 --
.../cassandra/db/HintedHandOffManager.java | 19 +---
.../org/apache/cassandra/db/SystemKeyspace.java | 55 +++++++---
.../db/commitlog/CommitLogReplayer.java | 12 +--
.../apache/cassandra/service/StorageProxy.java | 9 +-
.../cassandra/db/BatchlogManagerTest.java | 84 +++++++++++++--
.../apache/cassandra/db/HintedHandOffTest.java | 19 ++--
10 files changed, 214 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9f34023,ad26f6d..705f1b8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -108,6 -64,6 +108,7 @@@ Merged from 1.2
* Schedule schema pulls on change (CASSANDRA-6971)
* Non-droppable verbs shouldn't be dropped from OTC (CASSANDRA-6980)
* Shutdown batchlog executor in SS#drain() (CASSANDRA-7025)
++ * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
2.0.6
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 9567ef3,05f9392..ac78a73
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,46 -13,16 +13,55 @@@ restore snapshots created with the prev
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+2.1
+===
+
+New features
+------------
+ - SSTable data directory name is slightly changed. Each directory will
+ have hex string appended after CF name, e.g.
+ ks/cf-5be396077b811e3a3ab9dc4b9ac088d/
+ This hex string part represents unique ColumnFamily ID.
+ Note that existing directories are used as is, so only newly created
+ directories after upgrade have new directory name format.
+ - Saved key cache files also have ColumnFamily ID in their file name.
+ - It is now possible to do incremental repairs, sstables that have been
+ repaired are marked with a timestamp and not included in the next
+ repair session. Use nodetool repair -par -inc to use this feature.
+ A tool to manually mark/unmark sstables as repaired is available in
+ tools/bin/sstablerepairedset.
+
+Upgrading
+---------
+ - Rolling upgrades from anything pre-2.0.7 is not supported. Furthermore
+ pre-2.0 sstables are not supported. This means that before upgrading
+ a node on 2.1, this node must be started on 2.0 and
+ 'nodetool upgdradesstables' must be run (and this even in the case
+ of not-rolling upgrades).
+ - For size-tiered compaction users, Cassandra now defaults to ignoring
+ the coldest 5% of sstables. This can be customized with the
+ cold_reads_to_omit compaction option; 0.0 omits nothing (the old
+ behavior) and 1.0 omits everything.
+ - Multithreaded compaction has been removed.
+ - Counters implementation has been changed, replaced by a safer one with
+ less caveats, but different performance characteristics. You might have
+ to change your data model to accomodate the new implementation.
+ (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the dev
+ blog post at http://www.datastax.com/dev/blog/<PLACEHOLDER> for details).
+ - (per-table) index_interval parameter has been replaced with
+ min_index_interval and max_index_interval paratemeters. index_interval
+ has been deprecated.
+
+ 2.0.7
+ =====
+
+ Upgrading
+ ---------
+ - Nothing specific to this release, but please see 2.0.6 if you are upgrading
+ from a previous version.
+
+
2.0.6
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 02c029d,5770994..7f86a6e
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -244,45 -247,72 +240,72 @@@ public class BatchlogManager implement
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
int size = in.readInt();
- List<RowMutation> mutations = new ArrayList<>(size);
++ List<Mutation> mutations = new ArrayList<>(size);
+
for (int i = 0; i < size; i++)
- replaySerializedMutation(Mutation.serializer.deserialize(in, version), writtenAt, version, rateLimiter);
+ {
- RowMutation mutation = RowMutation.serializer.deserialize(in, version);
++ Mutation mutation = Mutation.serializer.deserialize(in, version);
+
+ // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then
+ // truncated.
+ for (UUID cfId : mutation.getColumnFamilyIds())
+ if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
+
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+
+ if (!mutations.isEmpty())
+ replayMutations(mutations, writtenAt, version, rateLimiter);
}
/*
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
* when a replica is down or a write request times out.
*/
- private void replaySerializedMutation(Mutation mutation, long writtenAt, int version, RateLimiter rateLimiter)
- private void replayMutations(List<RowMutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
++ private void replayMutations(List<Mutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
{
- int ttl = calculateHintTTL(mutation, writtenAt);
+ int ttl = calculateHintTTL(mutations, writtenAt);
if (ttl <= 0)
- return; // the mutation isn't safe to replay.
-
- Set<InetAddress> liveEndpoints = new HashSet<>();
- String ks = mutation.getKeyspaceName();
- Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
- int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+ return; // this batchlog entry has 'expired'
- for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
- StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
- for (RowMutation mutation : mutations)
++ for (Mutation mutation : mutations)
{
- rateLimiter.acquire(mutationSize);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- mutation.apply();
- else if (FailureDetector.instance.isAlive(endpoint))
- liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
- else
- StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
- }
+ List<InetAddress> liveEndpoints = new ArrayList<>();
+ List<InetAddress> hintEndpoints = new ArrayList<>();
- if (!liveEndpoints.isEmpty())
- attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
+ String ks = mutation.getKeyspaceName();
+ Token tk = StorageService.getPartitioner().getToken(mutation.key());
- int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version);
++ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+
+ for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ {
+ rateLimiter.acquire(mutationSize);
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ mutation.apply();
+ else if (FailureDetector.instance.isAlive(endpoint))
+ liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+ else
+ hintEndpoints.add(endpoint);
+ }
+
+ if (!liveEndpoints.isEmpty())
+ hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints));
+
+ for (InetAddress endpoint : hintEndpoints)
+ StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
+ }
}
- private void attemptDirectDelivery(Mutation mutation, long writtenAt, Set<InetAddress> endpoints)
+ // Returns the endpoints we failed to deliver to.
- private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, List<InetAddress> endpoints) throws IOException
++ private Set<InetAddress> attemptDirectDelivery(Mutation mutation, List<InetAddress> endpoints) throws IOException
{
- List<WriteResponseHandler> handlers = Lists.newArrayList();
- final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<>(endpoints);
+ final List<WriteResponseHandler> handlers = new ArrayList<>();
+ final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>());
+
for (final InetAddress ep : endpoints)
{
Runnable callback = new Runnable()
@@@ -310,22 -340,26 +333,21 @@@
}
}
- if (!undelivered.isEmpty())
- {
- int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
- if (ttl > 0)
- for (InetAddress endpoint : undelivered)
- StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
- }
+ return undelivered;
}
- // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
- // this ensures that deletes aren't "undone" by an old batch replay.
- private int calculateHintTTL(Mutation mutation, long writtenAt)
+ /*
+ * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
+ * This ensures that deletes aren't "undone" by an old batch replay.
+ */
- private int calculateHintTTL(List<RowMutation> mutations, long writtenAt)
++ private int calculateHintTTL(List<Mutation> mutations, long writtenAt)
{
- return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
+ int unadjustedTTL = Integer.MAX_VALUE;
- for (RowMutation mutation : mutations)
++ for (Mutation mutation : mutations)
+ unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
+ return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
}
- private static ByteBuffer columnName(String name)
- {
- return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
- }
-
// force flush + compaction to reclaim space from the replayed batches
private void cleanup() throws ExecutionException, InterruptedException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 923ea5b,36bc470..8f96765
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -2795,17 -2416,6 +2795,11 @@@ public class ColumnFamilyStore implemen
return getDataTracker().getDroppableTombstoneRatio();
}
- public long getTruncationTime()
- {
- Pair<ReplayPosition, Long> truncationRecord = SystemKeyspace.getTruncationRecords().get(metadata.cfId);
- return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right;
- }
-
+ public long trueSnapshotsSize()
+ {
+ return directories.trueSnapshotsSize();
+ }
+
@VisibleForTesting
void resetFileIndexGenerator()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 12404da,942707e..e83aefc
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -121,7 -119,7 +120,7 @@@ public class HintedHandOffManager imple
* Returns a mutation representing a Hint to be sent to <code>targetId</code>
* as soon as it becomes available again.
*/
- public Mutation hintFor(Mutation mutation, int ttl, UUID targetId)
- public RowMutation hintFor(RowMutation mutation, long now, int ttl, UUID targetId)
++ public Mutation hintFor(Mutation mutation, long now, int ttl, UUID targetId)
{
assert ttl > 0;
@@@ -134,11 -132,11 +133,11 @@@
UUID hintId = UUIDGen.getTimeUUID();
// serialize the hint with id and version as a composite column name
- ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version);
- ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
+ CellName name = CFMetaData.HintsCf.comparator.makeCellName(hintId, MessagingService.current_version);
+ ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
- cf.addColumn(name, value, System.currentTimeMillis(), ttl);
+ cf.addColumn(name, value, now, ttl);
- return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
+ return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
}
/*
@@@ -389,8 -387,7 +388,7 @@@
}
List<WriteResponseHandler> responseHandlers = Lists.newArrayList();
- Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>();
- for (final Column hint : hintsPage)
+ for (final Cell hint : hintsPage)
{
// check if hints delivery has been paused during the process
if (hintedHandOffPaused)
@@@ -427,21 -425,12 +425,12 @@@
throw new AssertionError(e);
}
- truncationTimesCache.clear();
- for (UUID cfId : ImmutableSet.copyOf((mutation.getColumnFamilyIds())))
- for (UUID cfId : rm.getColumnFamilyIds())
++ for (UUID cfId : mutation.getColumnFamilyIds())
{
- Long truncatedAt = truncationTimesCache.get(cfId);
- if (truncatedAt == null)
- {
- ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(cfId);
- truncatedAt = cfs.getTruncationTime();
- truncationTimesCache.put(cfId, truncatedAt);
- }
-
- if (hint.timestamp() < truncatedAt)
- if (hint.maxTimestamp() <= SystemKeyspace.getTruncatedAt(cfId))
++ if (hint.timestamp() <= SystemKeyspace.getTruncatedAt(cfId))
{
logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId);
- rm = rm.without(cfId);
+ mutation = mutation.without(cfId);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index a4072d1,fe8f179..b19eb1e
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -41,10 -40,8 +41,10 @@@ import org.apache.cassandra.config.Sche
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
- import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.Range;
@@@ -90,12 -86,8 +90,14 @@@ public class SystemKeyspac
private static final String LOCAL_KEY = "local";
private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
+ public static final List<String> allSchemaCfs = Arrays.asList(SCHEMA_KEYSPACES_CF,
+ SCHEMA_COLUMNFAMILIES_CF,
+ SCHEMA_COLUMNS_CF,
+ SCHEMA_TRIGGERS_CF,
+ SCHEMA_USER_TYPES_CF);
+
+ private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
+
public enum BootstrapState
{
NEEDS_BOOTSTRAP,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index a9d061a,dce7256..1b137ca
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -911,7 -900,10 +911,10 @@@ public class StorageProxy implements St
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
- public static void writeHintForMutation(Mutation mutation, int ttl, InetAddress target)
+ /**
+ * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
+ */
- public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target)
++ public static void writeHintForMutation(Mutation mutation, long now, int ttl, InetAddress target)
{
assert ttl > 0;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 43f96fb,954c1f2..9982be9
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@@ -28,9 -31,8 +31,10 @@@ import org.apache.cassandra.Util
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.composites.CellNameType;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.UUIDGen;
@@@ -106,4 -103,66 +110,72 @@@ public class BatchlogManagerTest extend
UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
assertEquals(500, result.one().getLong("count"));
}
+
+ @Test
+ public void testTruncatedReplay() throws InterruptedException, ExecutionException
+ {
++ CellNameType comparator2 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2").metadata.comparator;
++ CellNameType comparator3 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard3").metadata.comparator;
+ // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+ // Each batchlog entry with a mutation for Standard2 and Standard3.
+ // In the middle of the process, 'truncate' Standard2.
+ for (int i = 0; i < 1000; i++)
+ {
- RowMutation mutation1 = new RowMutation("Keyspace1", bytes(i));
- mutation1.add("Standard2", bytes(i), bytes(i), 0);
- RowMutation mutation2 = new RowMutation("Keyspace1", bytes(i));
- mutation2.add("Standard3", bytes(i), bytes(i), 0);
- List<RowMutation> mutations = Lists.newArrayList(mutation1, mutation2);
++ Mutation mutation1 = new Mutation("Keyspace1", bytes(i));
++ mutation1.add("Standard2", comparator2.makeCellName(bytes(i)), bytes(i), 0);
++ Mutation mutation2 = new Mutation("Keyspace1", bytes(i));
++ mutation2.add("Standard3", comparator3.makeCellName(bytes(i)), bytes(i), 0);
++ List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
+
+ // Make sure it's ready to be replayed, so adjust the timestamp.
+ long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2;
+
+ if (i == 500)
+ SystemKeyspace.saveTruncationRecord(Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2"),
+ timestamp,
+ ReplayPosition.NONE);
+
+ // Adjust the timestamp (slightly) to make the test deterministic.
+ if (i >= 500)
+ timestamp++;
+ else
+ timestamp--;
+
- BatchlogManager.getBatchlogMutationFor(mutations, UUIDGen.getTimeUUID(), timestamp * 1000).apply();
++ BatchlogManager.getBatchlogMutationFor(mutations,
++ UUIDGen.getTimeUUID(),
++ MessagingService.current_version,
++ timestamp * 1000)
++ .apply();
+ }
+
+ // Flush the batchlog to disk (see CASSANDRA-6822).
+ Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF).forceFlush();
+
+ // Force batchlog replay.
+ BatchlogManager.instance.replayAllFailedBatches();
+
+ // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
+ if (i >= 500)
+ {
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ else
+ {
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index c3b9367,9ffd702..622c816
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@@ -28,8 -27,9 +27,10 @@@ import java.util.concurrent.TimeUnit
import org.junit.Test;
+ import com.google.common.collect.Iterators;
+
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
@@@ -63,10 -61,14 +62,14 @@@ public class HintedHandOffTest extends
hintStore.disableAutoCompaction();
// insert 1 hint
- RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1));
- rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
+ Mutation rm = new Mutation(KEYSPACE4, ByteBufferUtil.bytes(1));
+ rm.add(STANDARD1_CF, Util.cellname(COLUMN1), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
- HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply();
+ HintedHandOffManager.instance.hintFor(rm,
+ System.currentTimeMillis(),
+ HintedHandOffManager.calculateHintTTL(rm),
+ UUID.randomUUID())
+ .apply();
// flush data to disk
hintStore.forceBlockingFlush();
@@@ -102,10 -104,14 +105,14 @@@
hintStore.clearUnsafe();
// insert 1 hint
- RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1));
- rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
+ Mutation rm = new Mutation(KEYSPACE4, ByteBufferUtil.bytes(1));
+ rm.add(STANDARD1_CF, Util.cellname(COLUMN1), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
- HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply();
+ HintedHandOffManager.instance.hintFor(rm,
+ System.currentTimeMillis(),
+ HintedHandOffManager.calculateHintTTL(rm),
+ UUID.randomUUID())
+ .apply();
assert getNoOfHints() == 1;