You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/06 16:33:41 UTC
[1/2] cassandra git commit: Optimize batchlog replay to avoid full
scans
Repository: cassandra
Updated Branches:
refs/heads/trunk bf8ac1acd -> c35bfc09c
Optimize batchlog replay to avoid full scans
patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-7237
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/762db474
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/762db474
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/762db474
Branch: refs/heads/trunk
Commit: 762db474273f764b189d3613fce33943cd64701b
Parents: ef59624
Author: Branimir Lambov <br...@datastax.com>
Authored: Sat Aug 1 11:55:47 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Aug 6 17:12:28 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
NEWS.txt | 2 +
.../apache/cassandra/db/BatchlogManager.java | 226 +++++++++--------
.../apache/cassandra/db/ColumnFamilyStore.java | 13 +
src/java/org/apache/cassandra/db/Memtable.java | 2 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 28 ++-
.../apache/cassandra/dht/LocalPartitioner.java | 30 ++-
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../cassandra/db/BatchlogManagerTest.java | 246 +++++++++++++++----
9 files changed, 395 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80e0e50..95fade9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta1
+ * Optimize batchlog replay to avoid full scans (CASSANDRA-7237)
* Repair improvements when using vnodes (CASSANDRA-5220)
* Disable scripted UDFs by default (CASSANDRA-9889)
* Add transparent data encryption core classes (CASSANDRA-9945)
@@ -11,6 +12,7 @@ Merged from 2.1:
Merged from 2.0:
* Don't cast expected bf size to an int (CASSANDRA-9959)
+
3.0.0-alpha1
* Implement proper sandboxing for UDFs (CASSANDRA-9402)
* Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1fcbb12..ef61f6c 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -58,6 +58,8 @@ Upgrading
be done by setting the new option `enabled` to `false`.
- Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
has been deprecated since 2.1.0 and is being removed in 3.0.0.
+ - Batchlog entries are now stored in a new table - system.batches.
+ The old one has been deprecated.
2.2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 9e90d9d..8ea4318 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -23,30 +23,24 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import com.google.common.util.concurrent.RateLimiter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -57,20 +51,22 @@ import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
public class BatchlogManager implements BatchlogManagerMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
- private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
- private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
+ private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
+ private static final int DEFAULT_PAGE_SIZE = 128;
private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
public static final BatchlogManager instance = new BatchlogManager();
- private final AtomicLong totalBatchesReplayed = new AtomicLong();
+ private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
+ private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
// Single-thread executor service for scheduling and serializing log replay.
private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
@@ -87,15 +83,20 @@ public class BatchlogManager implements BatchlogManagerMBean
throw new RuntimeException(e);
}
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws ExecutionException, InterruptedException
- {
- replayAllFailedBatches();
- }
- };
+ batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
+
+ batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches,
+ StorageService.RING_DELAY + REPLAY_INTERVAL,
+ REPLAY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void replayInitially()
+ {
+ // Initial run must take care of non-time-uuid batches as written by Version 1.2.
+ convertOldBatchEntries();
- batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
+ replayAllFailedBatches();
}
public static void shutdown() throws InterruptedException
@@ -106,13 +107,16 @@ public class BatchlogManager implements BatchlogManagerMBean
public int countAllBatches()
{
- String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG);
- return (int) executeInternal(query).one().getLong("count");
+ String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
+ UntypedResultSet results = executeInternal(query);
+ if (results.isEmpty())
+ return 0;
+ return (int) results.one().getLong("count");
}
public long getTotalBatchesReplayed()
{
- return totalBatchesReplayed.longValue();
+ return totalBatchesReplayed;
}
public void forceBatchlogReplay() throws Exception
@@ -122,34 +126,27 @@ public class BatchlogManager implements BatchlogManagerMBean
public Future<?> startBatchlogReplay()
{
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws ExecutionException, InterruptedException
- {
- replayAllFailedBatches();
- }
- };
// If a replay is already in progress this request will be executed after it completes.
- return batchlogTasks.submit(runnable);
+ return batchlogTasks.submit(this::replayAllFailedBatches);
}
- public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
+ void performInitialReplay() throws InterruptedException, ExecutionException
{
- return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros());
+ // Invokes initial replay. Used for testing only.
+ batchlogTasks.submit(this::replayInitially).get();
}
- @VisibleForTesting
- static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now)
+ public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
{
- return new RowUpdateBuilder(SystemKeyspace.Batchlog, now, uuid)
+ return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid)
.clustering()
.add("data", serializeMutations(mutations, version))
- .add("written_at", new Date(now / 1000))
.add("version", version)
.build();
}
- private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
+ @VisibleForTesting
+ static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
{
try (DataOutputBuffer buf = new DataOutputBuffer())
{
@@ -164,7 +161,7 @@ public class BatchlogManager implements BatchlogManagerMBean
}
}
- private void replayAllFailedBatches() throws ExecutionException, InterruptedException
+ private void replayAllFailedBatches()
{
logger.debug("Started replayAllFailedBatches");
@@ -173,67 +170,62 @@ public class BatchlogManager implements BatchlogManagerMBean
int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
- UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
- SystemKeyspace.NAME,
- SystemKeyspace.BATCHLOG,
- PAGE_SIZE));
-
- while (!page.isEmpty())
- {
- UUID id = processBatchlogPage(page, rateLimiter);
-
- if (page.size() < PAGE_SIZE)
- break; // we've exhausted the batchlog, next query would be empty.
-
- page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
- SystemKeyspace.NAME,
- SystemKeyspace.BATCHLOG,
- PAGE_SIZE),
- id);
- }
+ UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
+ int pageSize = calculatePageSize();
+ // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
+ // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
+ // token(id) > token(lastReplayedUuid) as part of the query.
+ String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
+ SystemKeyspace.NAME,
+ SystemKeyspace.BATCHES);
+ UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
+ processBatchlogEntries(batches, pageSize, rateLimiter);
+ lastReplayedUuid = limitUuid;
+ logger.debug("Finished replayAllFailedBatches");
+ }
- cleanup();
+ // read less rows (batches) per page if they are very large
+ private static int calculatePageSize()
+ {
+ ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
+ double averageRowSize = store.getMeanPartitionSize();
+ if (averageRowSize <= 0)
+ return DEFAULT_PAGE_SIZE;
- logger.debug("Finished replayAllFailedBatches");
+ return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
}
- private void deleteBatch(UUID id)
+ private static void deleteBatch(UUID id)
{
Mutation mutation = new Mutation(
- PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog,
+ PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
UUIDType.instance.decompose(id),
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()));
mutation.apply();
}
- private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
+ private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
{
- UUID id = null;
- ArrayList<Batch> batches = new ArrayList<>(page.size());
+ int positionInPage = 0;
+ ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize);
// Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
- for (UntypedResultSet.Row row : page)
+ for (UntypedResultSet.Row row : batches)
{
- id = row.getUUID("id");
- long writtenAt = row.getLong("written_at");
- // enough time for the actual write + batchlog entry mutation delivery (two separate requests).
- long timeout = getBatchlogTimeout();
- if (System.currentTimeMillis() < writtenAt + timeout)
- continue; // not ready to replay yet, might still get a deletion.
-
- int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
- Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version);
+ UUID id = row.getUUID("id");
+ int version = row.getInt("version");
+ Batch batch = new Batch(id, row.getBytes("data"), version);
try
{
if (batch.replay(rateLimiter) > 0)
{
- batches.add(batch);
+ unfinishedBatches.add(batch);
}
else
{
deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
- totalBatchesReplayed.incrementAndGet();
+ ++totalBatchesReplayed;
}
}
catch (IOException e)
@@ -241,22 +233,31 @@ public class BatchlogManager implements BatchlogManagerMBean
logger.warn("Skipped batch replay of {} due to {}", id, e);
deleteBatch(id);
}
+
+ if (++positionInPage == pageSize)
+ {
+ // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
+ // finish processing the page before requesting the next row.
+ finishAndClearBatches(unfinishedBatches);
+ positionInPage = 0;
+ }
}
+ finishAndClearBatches(unfinishedBatches);
+ }
- // now waiting for all batches to complete their processing
+ private void finishAndClearBatches(ArrayList<Batch> batches)
+ {
// schedule hints for timed out deliveries
for (Batch batch : batches)
{
batch.finish();
deleteBatch(batch.id);
}
-
- totalBatchesReplayed.addAndGet(batches.size());
-
- return id;
+ totalBatchesReplayed += batches.size();
+ batches.clear();
}
- public long getBatchlogTimeout()
+ public static long getBatchlogTimeout()
{
return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
}
@@ -270,10 +271,10 @@ public class BatchlogManager implements BatchlogManagerMBean
private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
- public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
+ Batch(UUID id, ByteBuffer data, int version)
{
this.id = id;
- this.writtenAt = writtenAt;
+ this.writtenAt = UUIDGen.unixTimestamp(id);
this.data = data;
this.version = version;
}
@@ -366,7 +367,7 @@ public class BatchlogManager implements BatchlogManagerMBean
}
}
- private List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
+ private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
{
List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
for (Mutation mutation : mutations)
@@ -384,7 +385,7 @@ public class BatchlogManager implements BatchlogManagerMBean
*
* @return direct delivery handler to wait on or null, if no live nodes found
*/
- private ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
+ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
{
Set<InetAddress> liveEndpoints = new HashSet<>();
String ks = mutation.getKeyspaceName();
@@ -429,9 +430,9 @@ public class BatchlogManager implements BatchlogManagerMBean
*/
private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
{
- private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
+ private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
- public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+ ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
{
super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
undelivered.addAll(writeEndpoints);
@@ -453,17 +454,42 @@ public class BatchlogManager implements BatchlogManagerMBean
}
}
- // force flush + compaction to reclaim space from the replayed batches
- private void cleanup() throws ExecutionException, InterruptedException
+ @SuppressWarnings("deprecation")
+ private static void convertOldBatchEntries()
{
- ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG);
- cfs.forceBlockingFlush();
- Collection<Descriptor> descriptors = new ArrayList<>();
- // expects ALL sstables to be available for compaction, so just use live set...
- for (SSTableReader sstr : cfs.getSSTables(SSTableSet.LIVE))
- descriptors.add(sstr.descriptor);
- if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
- CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
+ logger.debug("Started convertOldBatchEntries");
+
+ String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
+ SystemKeyspace.NAME,
+ SystemKeyspace.LEGACY_BATCHLOG);
+ UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE);
+ int convertedBatches = 0;
+ for (UntypedResultSet.Row row : batches)
+ {
+ UUID id = row.getUUID("id");
+ long timestamp = row.getLong("written_at");
+ int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+ logger.debug("Converting mutation at " + timestamp);
+
+ UUID newId = id;
+ if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id))
+ newId = UUIDGen.getTimeUUID(timestamp, convertedBatches);
+ ++convertedBatches;
+
+ Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches,
+ FBUtilities.timestampMicros(),
+ newId)
+ .clustering()
+ .add("data", row.getBytes("data"))
+ .add("version", version)
+ .build();
+
+ addRow.apply();
+ }
+ if (convertedBatches > 0)
+ Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
+ // cleanup will be called after replay
+ logger.debug("Finished convertOldBatchEntries");
}
public static class EndpointFilter
@@ -504,9 +530,7 @@ public class BatchlogManager implements BatchlogManagerMBean
if (validated.keySet().size() == 1)
{
// we have only 1 `other` rack
- // pick up to two random nodes from there
- List<InetAddress> otherRack = validated.get(validated.keySet().iterator().next());
- Collections.shuffle(otherRack);
+ Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
return Lists.newArrayList(Iterables.limit(otherRack, 2));
}
@@ -519,7 +543,7 @@ public class BatchlogManager implements BatchlogManagerMBean
else
{
racks = Lists.newArrayList(validated.keySet());
- Collections.shuffle((List) racks);
+ Collections.shuffle((List<String>) racks);
}
// grab a random member of up to two racks
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1f3c7db..255f9a0 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2054,6 +2054,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return count > 0 ? (int) (sum / count) : 0;
}
+ public double getMeanPartitionSize()
+ {
+ long sum = 0;
+ long count = 0;
+ for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
+ {
+ long n = sstable.getEstimatedPartitionSize().count();
+ sum += sstable.getEstimatedPartitionSize().mean() * n;
+ count += n;
+ }
+ return count > 0 ? sum * 1.0 / count : 0;
+ }
+
public long estimateKeys()
{
long n = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index a950e17..2db0ce9 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -342,7 +342,7 @@ public class Memtable implements Comparable<Memtable>
+ liveDataSize.get()) // data
* 1.2); // bloom filter and row index overhead
- this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
+ this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
}
public long getExpectedWriteSize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2d0ca24..bc0be65 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -88,7 +89,7 @@ public final class SystemKeyspace
public static final String NAME = "system";
public static final String HINTS = "hints";
- public static final String BATCHLOG = "batchlog";
+ public static final String BATCHES = "batches";
public static final String PAXOS = "paxos";
public static final String BUILT_INDEXES = "IndexInfo";
public static final String LOCAL = "local";
@@ -102,6 +103,7 @@ public final class SystemKeyspace
public static final String MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS = "materialized_views_builds_in_progress";
public static final String BUILT_MATERIALIZED_VIEWS = "built_materialized_views";
+ @Deprecated public static final String LEGACY_BATCHLOG = "batchlog";
@Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
@Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
@Deprecated public static final String LEGACY_COLUMNS = "schema_columns";
@@ -123,15 +125,15 @@ public final class SystemKeyspace
.compaction(CompactionParams.scts(singletonMap("enabled", "false")))
.gcGraceSeconds(0);
- public static final CFMetaData Batchlog =
- compile(BATCHLOG,
+ public static final CFMetaData Batches =
+ compile(BATCHES,
"batches awaiting replay",
"CREATE TABLE %s ("
- + "id uuid,"
+ + "id timeuuid,"
+ "data blob,"
+ "version int,"
- + "written_at timestamp,"
+ "PRIMARY KEY ((id)))")
+ .copy(new LocalPartitioner(TimeUUIDType.instance))
.compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
.gcGraceSeconds(0);
@@ -280,6 +282,19 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((keyspace_name), view_name))");
@Deprecated
+ public static final CFMetaData LegacyBatchlog =
+ compile(LEGACY_BATCHLOG,
+ "*DEPRECATED* batchlog entries",
+ "CREATE TABLE %s ("
+ + "id uuid,"
+ + "data blob,"
+ + "version int,"
+ + "written_at timestamp,"
+ + "PRIMARY KEY ((id)))")
+ .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
+ .gcGraceSeconds(0);
+
+ @Deprecated
public static final CFMetaData LegacyKeyspaces =
compile(LEGACY_KEYSPACES,
"*DEPRECATED* keyspace definitions",
@@ -409,7 +424,7 @@ public final class SystemKeyspace
{
return Tables.of(BuiltIndexes,
Hints,
- Batchlog,
+ Batches,
Paxos,
Local,
Peers,
@@ -421,6 +436,7 @@ public final class SystemKeyspace
AvailableRanges,
MaterializedViewsBuildsInProgress,
BuiltMaterializedViews,
+ LegacyBatchlog,
LegacyKeyspaces,
LegacyColumnfamilies,
LegacyColumns,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 2a5a16e..f9421c5 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -66,9 +66,37 @@ public class LocalPartitioner implements IPartitioner
public Token.TokenFactory getTokenFactory()
{
- throw new UnsupportedOperationException();
+ return tokenFactory;
}
+ private final Token.TokenFactory tokenFactory = new Token.TokenFactory()
+ {
+ public ByteBuffer toByteArray(Token token)
+ {
+ return ((LocalToken)token).token;
+ }
+
+ public Token fromByteArray(ByteBuffer bytes)
+ {
+ return new LocalToken(bytes);
+ }
+
+ public String toString(Token token)
+ {
+ return comparator.getString(((LocalToken)token).token);
+ }
+
+ public void validate(String token)
+ {
+ comparator.validate(comparator.fromString(token));
+ }
+
+ public Token fromString(String string)
+ {
+ return new LocalToken(comparator.fromString(string));
+ }
+ };
+
public boolean preservesOrder()
{
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 51aa48f..b637b17 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -863,7 +863,7 @@ public class StorageProxy implements StorageProxyMBean
null,
WriteType.SIMPLE);
Mutation mutation = new Mutation(
- PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog,
+ PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
UUIDType.instance.decompose(uuid),
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 5f1523e..fbb7a5b 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -17,58 +17,74 @@
*/
package org.apache.cassandra.db;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.partitions.ArrayBackedPartition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-import org.junit.BeforeClass;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import com.google.common.collect.Lists;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.partitions.ArrayBackedPartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
public class BatchlogManagerTest
{
private static final String KEYSPACE1 = "BatchlogManagerTest1";
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_STANDARD2 = "Standard2";
private static final String CF_STANDARD3 = "Standard3";
+ private static final String CF_STANDARD4 = "Standard4";
+
+ static PartitionerSwitcher sw;
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
+ sw = Util.switchPartitioner(Murmur3Partitioner.instance);
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3));
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance));
System.out.println(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata.partitionKeyColumns());
}
+ @AfterClass
+ public static void cleanup()
+ {
+ sw.close();
+ }
+
@Before
public void setUp() throws Exception
{
@@ -76,6 +92,8 @@ public class BatchlogManagerTest
InetAddress localhost = InetAddress.getByName("127.0.0.1");
metadata.updateNormalToken(Util.token("A"), localhost);
metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+ Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.Batches.cfId).truncateBlocking();
+ Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.LegacyBatchlog.cfId).truncateBlocking();
}
@Test
@@ -122,18 +140,17 @@ public class BatchlogManagerTest
.build();
long timestamp = i < 500
- ? (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000
- : Long.MAX_VALUE;
-
- Mutation m2 = BatchlogManager.getBatchlogMutationFor(Collections.singleton(m),
- UUIDGen.getTimeUUID(),
- MessagingService.current_version,
- timestamp);
- m2.applyUnsafe();
+ ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+ : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+ BatchlogManager.getBatchlogMutationFor(Collections.singleton(m),
+ UUIDGen.getTimeUUID(timestamp, i),
+ MessagingService.current_version)
+ .applyUnsafe();
}
// Flush the batchlog to disk (see CASSANDRA-6822).
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG).forceBlockingFlush();
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
@@ -165,25 +182,29 @@ public class BatchlogManagerTest
assertEquals(500, result.one().getLong("count"));
}
- /*
@Test
public void testTruncatedReplay() throws InterruptedException, ExecutionException
{
- CellNameType comparator2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2").metadata.comparator;
- CellNameType comparator3 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard3").metadata.comparator;
+ CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
+ CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
// Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
// Each batchlog entry with a mutation for Standard2 and Standard3.
// In the middle of the process, 'truncate' Standard2.
for (int i = 0; i < 1000; i++)
{
- Mutation mutation1 = new Mutation(KEYSPACE1, bytes(i));
- mutation1.add("Standard2", comparator2.makeCellName(bytes(i)), bytes(i), 0);
- Mutation mutation2 = new Mutation(KEYSPACE1, bytes(i));
- mutation2.add("Standard3", comparator3.makeCellName(bytes(i)), bytes(i), 0);
+ Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+ Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+
List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
// Make sure it's ready to be replayed, so adjust the timestamp.
- long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2;
+ long timestamp = System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout();
if (i == 500)
SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"),
@@ -197,14 +218,13 @@ public class BatchlogManagerTest
timestamp--;
BatchlogManager.getBatchlogMutationFor(mutations,
- UUIDGen.getTimeUUID(),
- MessagingService.current_version,
- timestamp * 1000)
+ UUIDGen.getTimeUUID(timestamp, i),
+ MessagingService.current_version)
.applyUnsafe();
}
// Flush the batchlog to disk (see CASSANDRA-6822).
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG).forceFlush();
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
// Force batchlog replay and wait for it to complete.
BatchlogManager.instance.startBatchlogReplay().get();
@@ -216,8 +236,8 @@ public class BatchlogManagerTest
if (i >= 500)
{
assertEquals(bytes(i), result.one().getBytes("key"));
- assertEquals(bytes(i), result.one().getBytes("column1"));
- assertEquals(bytes(i), result.one().getBytes("value"));
+ assertEquals("name" + i, result.one().getString("name"));
+ assertEquals("val" + i, result.one().getString("val"));
}
else
{
@@ -229,9 +249,143 @@ public class BatchlogManagerTest
{
UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
assertEquals(bytes(i), result.one().getBytes("key"));
- assertEquals(bytes(i), result.one().getBytes("column1"));
- assertEquals(bytes(i), result.one().getBytes("value"));
+ assertEquals("name" + i, result.one().getString("name"));
+ assertEquals("val" + i, result.one().getString("val"));
}
}
- */
+
+ static Mutation fakeVersion12MutationFor(Collection<Mutation> mutations, long now) throws IOException
+ {
+ // Serialization can't write version 1.2 mutations, pretend this is old by using random id and written_at and
+ // saving it in the legacy batchlog.
+ UUID uuid = UUID.randomUUID();
+ ByteBuffer writtenAt = LongType.instance.decompose(now);
+ int version = MessagingService.VERSION_30;
+ ByteBuffer data = BatchlogManager.serializeMutations(mutations, version);
+
+ return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
+ .clustering()
+ .add("written_at", writtenAt)
+ .add("data", data)
+ .add("version", version)
+ .build();
+ }
+
+ static Mutation fakeVersion20MutationFor(Collection<Mutation> mutations, UUID uuid)
+ {
+ // Serialization can't write version 1.2 mutations, pretend this is old by saving it in the legacy batchlog.
+ int version = MessagingService.VERSION_30;
+ ByteBuffer writtenAt = LongType.instance.decompose(UUIDGen.unixTimestamp(uuid));
+ return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
+ .clustering()
+ .add("data", BatchlogManager.serializeMutations(mutations, version))
+ .add("written_at", writtenAt)
+ .add("version", version)
+ .build();
+ }
+
+ @Test
+ public void testConversion() throws Exception
+ {
+ long initialAllBatches = BatchlogManager.instance.countAllBatches();
+ long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+ CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
+
+ // Generate 1000 mutations and put them all into the batchlog.
+ // Half (500) ready to be replayed, half not.
+ for (int i = 0; i < 1000; i++)
+ {
+ Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+
+ long timestamp = i < 500
+ ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+ : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+
+ fakeVersion12MutationFor(Collections.singleton(mutation), timestamp).applyUnsafe();
+ }
+
+ // Add 400 version 2.0 mutations and put them all into the batchlog.
+ // Half (200) ready to be replayed, half not.
+ for (int i = 1000; i < 1400; i++)
+ {
+ Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+
+ long timestamp = i < 1200
+ ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+ : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+
+ fakeVersion20MutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(timestamp, i)).applyUnsafe();
+ }
+
+ // Mix in 100 current version mutations, 50 ready for replay.
+ for (int i = 1400; i < 1500; i++)
+ {
+ Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+
+ long timestamp = i < 1450
+ ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+ : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+
+ BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation),
+ UUIDGen.getTimeUUID(timestamp, i),
+ MessagingService.current_version)
+ .applyUnsafe();
+ }
+
+ // Flush the batchlog to disk (see CASSANDRA-6822).
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+ assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+ UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+ assertEquals("Count in blog legacy", 1400, result.one().getLong("count"));
+ result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+ assertEquals("Count in blog", 100, result.one().getLong("count"));
+
+ // Force batchlog replay and wait for it to complete.
+ BatchlogManager.instance.performInitialReplay();
+
+ // Ensure that the first half, and only the first half, got replayed.
+ assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+ for (int i = 0; i < 1500; i++)
+ {
+ result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
+ if (i < 500 || i >= 1000 && i < 1200 || i >= 1400 && i < 1450)
+ {
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals("name" + i, result.one().getString("name"));
+ assertEquals("val" + i, result.one().getString("val"));
+ }
+ else
+ {
+ assertTrue("Present at " + i, result.isEmpty());
+ }
+ }
+
+ // Ensure that no stray mutations got somehow applied.
+ result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
+ assertEquals(750, result.one().getLong("count"));
+
+ // Ensure batchlog is left as expected.
+ result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+ assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
+ result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+ assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
+ }
}
[2/2] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c35bfc09
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c35bfc09
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c35bfc09
Branch: refs/heads/trunk
Commit: c35bfc09cca4add42afed0733a29d6f6843dbba0
Parents: bf8ac1a 762db47
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Aug 6 17:32:39 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Aug 6 17:32:39 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
NEWS.txt | 2 +
.../apache/cassandra/db/BatchlogManager.java | 226 +++++++++--------
.../apache/cassandra/db/ColumnFamilyStore.java | 13 +
src/java/org/apache/cassandra/db/Memtable.java | 2 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 28 ++-
.../apache/cassandra/dht/LocalPartitioner.java | 30 ++-
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../cassandra/db/BatchlogManagerTest.java | 246 +++++++++++++++----
9 files changed, 395 insertions(+), 156 deletions(-)
----------------------------------------------------------------------