You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/02 09:51:55 UTC
[3/4] cassandra git commit: Improve batchlog write path
Improve batchlog write path
patch by Stefania Alborghetti; reviewed by Aleksey Yeschenko for
CASSANDRA-9673
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53a177a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53a177a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53a177a9
Branch: refs/heads/trunk
Commit: 53a177a9150586e56408f25c959f75110a2997e7
Parents: 5f02f20
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Jul 10 17:03:06 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 2 08:43:42 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 1 -
.../org/apache/cassandra/batchlog/Batch.java | 155 +++++
.../batchlog/BatchRemoveVerbHandler.java | 31 +
.../batchlog/BatchStoreVerbHandler.java | 32 +
.../cassandra/batchlog/BatchlogManager.java | 554 +++++++++++++++++
.../batchlog/BatchlogManagerMBean.java | 38 ++
.../batchlog/LegacyBatchlogMigrator.java | 196 ++++++
.../org/apache/cassandra/concurrent/Stage.java | 2 -
.../cassandra/concurrent/StageManager.java | 1 -
.../org/apache/cassandra/config/Config.java | 1 -
.../cassandra/config/DatabaseDescriptor.java | 7 +-
.../cql3/statements/BatchStatement.java | 5 -
.../apache/cassandra/db/BatchlogManager.java | 596 -------------------
.../cassandra/db/BatchlogManagerMBean.java | 38 --
.../db/CounterMutationVerbHandler.java | 2 +-
src/java/org/apache/cassandra/db/Mutation.java | 7 +-
.../cassandra/db/MutationVerbHandler.java | 43 +-
.../cassandra/db/ReadRepairVerbHandler.java | 3 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 2 +-
.../org/apache/cassandra/db/WriteResponse.java | 18 +-
.../cassandra/hints/EncodedHintMessage.java | 6 +-
src/java/org/apache/cassandra/hints/Hint.java | 11 +-
.../org/apache/cassandra/hints/HintMessage.java | 14 +-
.../cassandra/hints/LegacyHintsMigrator.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 26 +-
.../cassandra/service/CassandraDaemon.java | 6 +-
.../apache/cassandra/service/StorageProxy.java | 208 ++++---
.../cassandra/service/StorageService.java | 19 +-
.../service/paxos/CommitVerbHandler.java | 6 +-
.../org/apache/cassandra/tools/NodeProbe.java | 4 +-
.../cql3/MaterializedViewLongTest.java | 2 +-
.../apache/cassandra/batchlog/BatchTest.java | 153 +++++
.../batchlog/BatchlogEndpointFilterTest.java | 115 ++++
.../cassandra/batchlog/BatchlogManagerTest.java | 460 ++++++++++++++
.../cassandra/db/BatchlogManagerTest.java | 394 ------------
.../service/BatchlogEndpointFilterTest.java | 117 ----
37 files changed, 1945 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fe8f453..751b75d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta2
+ * Improve batchlog write patch (CASSANDRA-9673)
* Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
* Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
* Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 16108bd..0f8b829 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -355,7 +355,6 @@ seed_provider:
concurrent_reads: 32
concurrent_writes: 32
concurrent_counter_writes: 32
-concurrent_batchlog_writes: 32
# For materialized view writes, as there is a read involved, so this should
# be limited by the less of concurrent reads or concurrent writes.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/Batch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
new file mode 100644
index 0000000..caa2682
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofVInt;
+
+public final class Batch
+{
+ public static final Serializer serializer = new Serializer();
+
+ public final UUID id;
+ public final long creationTime; // time of batch creation (in microseconds)
+
+ // one of these will always be empty
+ final Collection<Mutation> decodedMutations;
+ final Collection<ByteBuffer> encodedMutations;
+
+ private Batch(UUID id, long creationTime, Collection<Mutation> decodedMutations, Collection<ByteBuffer> encodedMutations)
+ {
+ this.id = id;
+ this.creationTime = creationTime;
+
+ this.decodedMutations = decodedMutations;
+ this.encodedMutations = encodedMutations;
+ }
+
+ /**
+ * Creates a 'local' batch - with all enclosed mutations in decoded form (as Mutation instances)
+ */
+ public static Batch createLocal(UUID id, long creationTime, Collection<Mutation> mutations)
+ {
+ return new Batch(id, creationTime, mutations, Collections.emptyList());
+ }
+
+ /**
+ * Creates a 'remote' batch - with all enclosed mutations in encoded form (as ByteBuffer instances)
+ *
+ * The mutations will always be encoded using the current messaging version.
+ */
+ public static Batch createRemote(UUID id, long creationTime, Collection<ByteBuffer> mutations)
+ {
+ return new Batch(id, creationTime, Collections.<Mutation>emptyList(), mutations);
+ }
+
+ /**
+ * Count of the mutations in the batch.
+ */
+ public int size()
+ {
+ return decodedMutations.size() + encodedMutations.size();
+ }
+
+ static final class Serializer implements IVersionedSerializer<Batch>
+ {
+ public long serializedSize(Batch batch, int version)
+ {
+ assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+ long size = UUIDSerializer.serializer.serializedSize(batch.id, version);
+ size += sizeof(batch.creationTime);
+
+ size += sizeofVInt(batch.decodedMutations.size());
+ for (Mutation mutation : batch.decodedMutations)
+ {
+ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+ size += sizeofVInt(mutationSize);
+ size += mutationSize;
+ }
+
+ return size;
+ }
+
+ public void serialize(Batch batch, DataOutputPlus out, int version) throws IOException
+ {
+ assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+ UUIDSerializer.serializer.serialize(batch.id, out, version);
+ out.writeLong(batch.creationTime);
+
+ out.writeVInt(batch.decodedMutations.size());
+ for (Mutation mutation : batch.decodedMutations)
+ {
+ out.writeVInt(Mutation.serializer.serializedSize(mutation, version));
+ Mutation.serializer.serialize(mutation, out, version);
+ }
+ }
+
+ public Batch deserialize(DataInputPlus in, int version) throws IOException
+ {
+ UUID id = UUIDSerializer.serializer.deserialize(in, version);
+ long creationTime = in.readLong();
+
+ /*
+ * If version doesn't match the current one, we cannot not just read the encoded mutations verbatim,
+ * so we decode them instead, to deal with compatibility.
+ */
+ return version == MessagingService.current_version
+ ? createRemote(id, creationTime, readEncodedMutations(in))
+ : createLocal(id, creationTime, decodeMutations(in, version));
+ }
+
+ private static Collection<ByteBuffer> readEncodedMutations(DataInputPlus in) throws IOException
+ {
+ int count = (int) in.readVInt();
+
+ ArrayList<ByteBuffer> mutations = new ArrayList<>(count);
+ for (int i = 0; i < count; i++)
+ mutations.add(ByteBufferUtil.readWithVIntLength(in));
+
+ return mutations;
+ }
+
+ private static Collection<Mutation> decodeMutations(DataInputPlus in, int version) throws IOException
+ {
+ int count = (int) in.readVInt();
+
+ ArrayList<Mutation> mutations = new ArrayList<>(count);
+ for (int i = 0; i < count; i++)
+ {
+ in.readVInt(); // skip mutation size
+ mutations.add(Mutation.serializer.deserialize(in, version));
+ }
+
+ return mutations;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
new file mode 100644
index 0000000..3c3fcec
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.util.UUID;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+
+public final class BatchRemoveVerbHandler implements IVerbHandler<UUID>
+{
+ public void doVerb(MessageIn<UUID> message, int id)
+ {
+ BatchlogManager.remove(message.payload);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
new file mode 100644
index 0000000..4bc878c
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import org.apache.cassandra.db.WriteResponse;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+public final class BatchStoreVerbHandler implements IVerbHandler<Batch>
+{
+ public void doVerb(MessageIn<Batch> message, int id)
+ {
+ BatchlogManager.store(message.payload);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
new file mode 100644
index 0000000..934ebaa
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static com.google.common.collect.Iterables.transform;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
+
+public class BatchlogManager implements BatchlogManagerMBean
+{
+ public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
+ private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
+ static final int DEFAULT_PAGE_SIZE = 128;
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
+ public static final BatchlogManager instance = new BatchlogManager();
+
+ private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
+ private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
+
+ // Single-thread executor service for scheduling and serializing log replay.
+ private final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
+
+ public void start()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
+ StorageService.RING_DELAY,
+ REPLAY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+ batchlogTasks.shutdown();
+ batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
+ public static void remove(UUID id)
+ {
+ new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
+ UUIDType.instance.decompose(id),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()))
+ .apply();
+ }
+
+ public static void store(Batch batch)
+ {
+ store(batch, true);
+ }
+
+ public static void store(Batch batch, boolean durableWrites)
+ {
+ RowUpdateBuilder builder =
+ new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
+ .clustering()
+ .add("version", MessagingService.current_version);
+
+ for (ByteBuffer mutation : batch.encodedMutations)
+ builder.addListEntry("mutations", mutation);
+
+ for (Mutation mutation : batch.decodedMutations)
+ {
+ try (DataOutputBuffer buffer = new DataOutputBuffer())
+ {
+ Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
+ builder.addListEntry("mutations", buffer.buffer());
+ }
+ catch (IOException e)
+ {
+ // shouldn't happen
+ throw new AssertionError(e);
+ }
+ }
+
+ builder.build().apply(durableWrites);
+ }
+
+ @VisibleForTesting
+ public int countAllBatches()
+ {
+ String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
+ UntypedResultSet results = executeInternal(query);
+ if (results == null || results.isEmpty())
+ return 0;
+
+ return (int) results.one().getLong("count");
+ }
+
+ public long getTotalBatchesReplayed()
+ {
+ return totalBatchesReplayed;
+ }
+
+ public void forceBatchlogReplay() throws Exception
+ {
+ startBatchlogReplay().get();
+ }
+
+ public Future<?> startBatchlogReplay()
+ {
+ // If a replay is already in progress this request will be executed after it completes.
+ return batchlogTasks.submit(this::replayFailedBatches);
+ }
+
+ void performInitialReplay() throws InterruptedException, ExecutionException
+ {
+ // Invokes initial replay. Used for testing only.
+ batchlogTasks.submit(this::replayFailedBatches).get();
+ }
+
+ private void replayFailedBatches()
+ {
+ logger.debug("Started replayFailedBatches");
+
+ // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+ // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+ int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+ RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
+ UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
+ ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
+ int pageSize = calculatePageSize(store);
+ // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
+ // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
+ // token(id) > token(lastReplayedUuid) as part of the query.
+ String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
+ SystemKeyspace.NAME,
+ SystemKeyspace.BATCHES);
+ UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
+ processBatchlogEntries(batches, pageSize, rateLimiter);
+ lastReplayedUuid = limitUuid;
+ logger.debug("Finished replayFailedBatches");
+ }
+
+ // read less rows (batches) per page if they are very large
+ static int calculatePageSize(ColumnFamilyStore store)
+ {
+ double averageRowSize = store.getMeanPartitionSize();
+ if (averageRowSize <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
+ }
+
+ private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
+ {
+ int positionInPage = 0;
+ ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
+
+ Set<InetAddress> hintedNodes = new HashSet<>();
+ Set<UUID> replayedBatches = new HashSet<>();
+
+ // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
+ for (UntypedResultSet.Row row : batches)
+ {
+ UUID id = row.getUUID("id");
+ int version = row.getInt("version");
+ try
+ {
+ ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
+ if (batch.replay(rateLimiter, hintedNodes) > 0)
+ {
+ unfinishedBatches.add(batch);
+ }
+ else
+ {
+ remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
+ ++totalBatchesReplayed;
+ }
+ }
+ catch (IOException e)
+ {
+ logger.warn("Skipped batch replay of {} due to {}", id, e);
+ remove(id);
+ }
+
+ if (++positionInPage == pageSize)
+ {
+ // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
+ // finish processing the page before requesting the next row.
+ finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+ positionInPage = 0;
+ }
+ }
+
+ finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+
+ // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
+ HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
+
+ // once all generated hints are fsynced, actually delete the batches
+ replayedBatches.forEach(BatchlogManager::remove);
+ }
+
+ private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
+ {
+ // schedule hints for timed out deliveries
+ for (ReplayingBatch batch : batches)
+ {
+ batch.finish(hintedNodes);
+ replayedBatches.add(batch.id);
+ }
+
+ totalBatchesReplayed += batches.size();
+ batches.clear();
+ }
+
+ public static long getBatchlogTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
+ }
+
+ private static class ReplayingBatch
+ {
+ private final UUID id;
+ private final long writtenAt;
+ private final List<Mutation> mutations;
+ private final int replayedBytes;
+
+ private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
+
+ ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
+ {
+ this.id = id;
+ this.writtenAt = UUIDGen.unixTimestamp(id);
+ this.mutations = new ArrayList<>(serializedMutations.size());
+ this.replayedBytes = addMutations(version, serializedMutations);
+ }
+
+ public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
+ {
+ logger.debug("Replaying batch {}", id);
+
+ if (mutations.isEmpty())
+ return 0;
+
+ int gcgs = gcgs(mutations);
+ if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+ return 0;
+
+ replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
+
+ rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
+
+ return replayHandlers.size();
+ }
+
+ public void finish(Set<InetAddress> hintedNodes)
+ {
+ for (int i = 0; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException|WriteFailureException e)
+ {
+ logger.debug("Failed replaying a batched mutation to a node, will write a hint");
+ logger.debug("Failure was : {}", e.getMessage());
+ // writing hints for the rest to hints, starting from i
+ writeHintsForUndeliveredEndpoints(i, hintedNodes);
+ return;
+ }
+ }
+ }
+
+ private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
+ {
+ int ret = 0;
+ for (ByteBuffer serializedMutation : serializedMutations)
+ {
+ ret += serializedMutation.remaining();
+ try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
+ {
+ addMutation(Mutation.serializer.deserialize(in, version));
+ }
+ }
+
+ return ret;
+ }
+
+ // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
+ // truncated.
+ private void addMutation(Mutation mutation)
+ {
+ for (UUID cfId : mutation.getColumnFamilyIds())
+ if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
+
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+
+ private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
+ {
+ int gcgs = gcgs(mutations);
+
+ // expired
+ if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+ return;
+
+ for (int i = startFrom; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+ Mutation undeliveredMutation = mutations.get(i);
+
+ if (handler != null)
+ {
+ hintedNodes.addAll(handler.undelivered);
+ HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+ Hint.create(undeliveredMutation, writtenAt));
+ }
+ }
+ }
+
+ private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
+ long writtenAt,
+ Set<InetAddress> hintedNodes)
+ {
+ List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
+ for (Mutation mutation : mutations)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
+ if (handler != null)
+ handlers.add(handler);
+ }
+ return handlers;
+ }
+
+ /**
+ * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+ * when a replica is down or a write request times out.
+ *
+ * @return direct delivery handler to wait on or null, if no live nodes found
+ */
+ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
+ long writtenAt,
+ Set<InetAddress> hintedNodes)
+ {
+ Set<InetAddress> liveEndpoints = new HashSet<>();
+ String ks = mutation.getKeyspaceName();
+ Token tk = mutation.key().getToken();
+
+ for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ {
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ mutation.apply();
+ }
+ else if (FailureDetector.instance.isAlive(endpoint))
+ {
+ liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+ }
+ else
+ {
+ hintedNodes.add(endpoint);
+ HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+ Hint.create(mutation, writtenAt));
+ }
+ }
+
+ if (liveEndpoints.isEmpty())
+ return null;
+
+ ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
+ MessageOut<Mutation> message = mutation.createMessage();
+ for (InetAddress endpoint : liveEndpoints)
+ MessagingService.instance().sendRR(message, endpoint, handler, false);
+ return handler;
+ }
+
+ private static int gcgs(Collection<Mutation> mutations)
+ {
+ int gcgs = Integer.MAX_VALUE;
+ for (Mutation mutation : mutations)
+ gcgs = Math.min(gcgs, mutation.smallestGCGS());
+ return gcgs;
+ }
+
+ /**
+ * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
+ * which we did not receive a successful reply.
+ */
+ private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
+ {
+ private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+ {
+ super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+ undelivered.addAll(writeEndpoints);
+ }
+
+ @Override
+ protected int totalBlockFor()
+ {
+ return this.naturalEndpoints.size();
+ }
+
+ @Override
+ public void response(MessageIn<T> m)
+ {
+ boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
+ assert removed;
+ super.response(m);
+ }
+ }
+ }
+
+ public static class EndpointFilter
+ {
+ private final String localRack;
+ private final Multimap<String, InetAddress> endpoints;
+
+ public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+ {
+ this.localRack = localRack;
+ this.endpoints = endpoints;
+ }
+
+ /**
+ * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+ */
+ public Collection<InetAddress> filter()
+ {
+ // special case for single-node data centers
+ if (endpoints.values().size() == 1)
+ return endpoints.values();
+
+ // strip out dead endpoints and localhost
+ ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+ for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+ if (isValid(entry.getValue()))
+ validated.put(entry.getKey(), entry.getValue());
+
+ if (validated.size() <= 2)
+ return validated.values();
+
+ if (validated.size() - validated.get(localRack).size() >= 2)
+ {
+ // we have enough endpoints in other racks
+ validated.removeAll(localRack);
+ }
+
+ if (validated.keySet().size() == 1)
+ {
+ // we have only 1 `other` rack
+ Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+ return Lists.newArrayList(Iterables.limit(otherRack, 2));
+ }
+
+ // randomize which racks we pick from if more than 2 remaining
+ Collection<String> racks;
+ if (validated.keySet().size() == 2)
+ {
+ racks = validated.keySet();
+ }
+ else
+ {
+ racks = Lists.newArrayList(validated.keySet());
+ Collections.shuffle((List<String>) racks);
+ }
+
+ // grab a random member of up to two racks
+ List<InetAddress> result = new ArrayList<>(2);
+ for (String rack : Iterables.limit(racks, 2))
+ {
+ List<InetAddress> rackMembers = validated.get(rack);
+ result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+ }
+
+ return result;
+ }
+
+ @VisibleForTesting
+ protected boolean isValid(InetAddress input)
+ {
+ return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+ }
+
+ @VisibleForTesting
+ protected int getRandomInt(int bound)
+ {
+ return ThreadLocalRandom.current().nextInt(bound);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
new file mode 100644
index 0000000..4dcc9f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+public interface BatchlogManagerMBean
+{
+ /**
+ * Counts all batches currently in the batchlog.
+ *
+ * @return total batch count
+ */
+ public int countAllBatches();
+
+ /**
+ * @return total count of batches replayed since node start
+ */
+ public long getTotalBatchesReplayed();
+
+ /**
+ * Forces batchlog replay. Returns immediately if replay is already in progress.
+ */
+ public void forceBatchlogReplay() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
new file mode 100644
index 0000000..13ff81a
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class LegacyBatchlogMigrator
+{
+ private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class);
+
+ private LegacyBatchlogMigrator()
+ {
+ // static class
+ }
+
+ @SuppressWarnings("deprecation")
+ public static void migrate()
+ {
+ ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG);
+
+ // nothing to migrate
+ if (store.isEmpty())
+ return;
+
+ logger.info("Migrating legacy batchlog to new storage");
+
+ int convertedBatches = 0;
+ String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
+ SystemKeyspace.NAME,
+ SystemKeyspace.LEGACY_BATCHLOG);
+
+ int pageSize = BatchlogManager.calculatePageSize(store);
+
+ UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize);
+ for (UntypedResultSet.Row row : rows)
+ {
+ if (apply(row, convertedBatches))
+ convertedBatches++;
+ }
+
+ if (convertedBatches > 0)
+ Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
+ }
+
+ @SuppressWarnings("deprecation")
+ public static boolean isLegacyBatchlogMutation(Mutation mutation)
+ {
+ return mutation.getKeyspaceName().equals(SystemKeyspace.NAME)
+ && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null;
+ }
+
+ @SuppressWarnings("deprecation")
+ public static void handleLegacyMutation(Mutation mutation)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId);
+ logger.debug("Applying legacy batchlog mutation {}", update);
+ update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1));
+ }
+
+ private static boolean apply(UntypedResultSet.Row row, long counter)
+ {
+ UUID id = row.getUUID("id");
+ long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at");
+ int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+
+ if (id.version() != 1)
+ id = UUIDGen.getTimeUUID(timestamp, counter);
+
+ logger.debug("Converting mutation at {}", timestamp);
+
+ try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false))
+ {
+ int numMutations = in.readInt();
+ List<Mutation> mutations = new ArrayList<>(numMutations);
+ for (int i = 0; i < numMutations; i++)
+ mutations.add(Mutation.serializer.deserialize(in, version));
+
+ BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations));
+ return true;
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t);
+ return false;
+ }
+ }
+
+ public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
+ throws WriteTimeoutException, WriteFailureException
+ {
+ for (InetAddress target : endpoints)
+ {
+ logger.debug("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
+ int targetVersion = MessagingService.instance().getVersion(target);
+ MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION),
+ target,
+ handler,
+ false);
+ }
+ }
+
+ public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+ {
+ AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
+ Collections.<InetAddress>emptyList(),
+ ConsistencyLevel.ANY,
+ Keyspace.open(SystemKeyspace.NAME),
+ null,
+ WriteType.SIMPLE);
+ Mutation mutation = getRemoveMutation(uuid);
+
+ for (InetAddress target : endpoints)
+ {
+ logger.debug("Sending legacy batchlog remove request {} to {}", uuid, target);
+ MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false);
+ }
+ }
+
+ static void store(Batch batch, int version)
+ {
+ getStoreMutation(batch, version).apply();
+ }
+
+ @SuppressWarnings("deprecation")
+ static Mutation getStoreMutation(Batch batch, int version)
+ {
+ return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id)
+ .clustering()
+ .add("written_at", new Date(batch.creationTime / 1000))
+ .add("data", getSerializedMutations(version, batch.decodedMutations))
+ .add("version", version)
+ .build();
+ }
+
+ @SuppressWarnings("deprecation")
+ private static Mutation getRemoveMutation(UUID uuid)
+ {
+ return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog,
+ UUIDType.instance.decompose(uuid),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()));
+ }
+
+ private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations)
+ {
+ try (DataOutputBuffer buf = new DataOutputBuffer())
+ {
+ buf.writeInt(mutations.size());
+ for (Mutation mutation : mutations)
+ Mutation.serializer.serialize(mutation, buf, version);
+ return buf.buffer();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index e91c515..a57587c 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -27,7 +27,6 @@ public enum Stage
READ,
MUTATION,
COUNTER_MUTATION,
- BATCHLOG_MUTATION,
MATERIALIZED_VIEW_MUTATION,
GOSSIP,
REQUEST_RESPONSE,
@@ -62,7 +61,6 @@ public enum Stage
return "internal";
case MUTATION:
case COUNTER_MUTATION:
- case BATCHLOG_MUTATION:
case MATERIALIZED_VIEW_MUTATION:
case READ:
case REQUEST_RESPONSE:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index ca83829..ee1fbe5 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -47,7 +47,6 @@ public class StageManager
{
stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
- stages.put(Stage.BATCHLOG_MUTATION, multiThreadedLowSignalStage(Stage.BATCHLOG_MUTATION, getConcurrentBatchlogWriters()));
stages.put(Stage.MATERIALIZED_VIEW_MUTATION, multiThreadedLowSignalStage(Stage.MATERIALIZED_VIEW_MUTATION, getConcurrentMaterializedViewWriters()));
stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9d55fc8..22b09d3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -93,7 +93,6 @@ public class Config
public Integer concurrent_reads = 32;
public Integer concurrent_writes = 32;
public Integer concurrent_counter_writes = 32;
- public Integer concurrent_batchlog_writes = 32;
public Integer concurrent_materialized_view_writes = 32;
@Deprecated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4e13911..31a4e9d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1080,8 +1080,9 @@ public class DatabaseDescriptor
case PAXOS_COMMIT:
case PAXOS_PREPARE:
case PAXOS_PROPOSE:
- case BATCHLOG_MUTATION:
case HINT:
+ case BATCH_STORE:
+ case BATCH_REMOVE:
return getWriteRpcTimeout();
case COUNTER_MUTATION:
return getCounterWriteRpcTimeout();
@@ -1128,10 +1129,6 @@ public class DatabaseDescriptor
return conf.concurrent_counter_writes;
}
- public static int getConcurrentBatchlogWriters()
- {
- return conf.concurrent_batchlog_writes;
- }
public static int getConcurrentMaterializedViewWriters()
{
return conf.concurrent_materialized_view_writes;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 5de4b6c..c8482b3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -524,11 +524,6 @@ public class BatchStatement implements CQLStatement
}
}
- public interface BatchVariables
- {
- public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
- }
-
public String toString()
{
return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
deleted file mode 100644
index de85925..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-import com.google.common.util.concurrent.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteFailureException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.hints.Hint;
-import org.apache.cassandra.hints.HintsService;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static com.google.common.collect.Iterables.transform;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
-
-public class BatchlogManager implements BatchlogManagerMBean
-{
- public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
- private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
- private static final int DEFAULT_PAGE_SIZE = 128;
-
- private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
- public static final BatchlogManager instance = new BatchlogManager();
-
- private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
- private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
-
- // Single-thread executor service for scheduling and serializing log replay.
- private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
-
- public void start()
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
-
- batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches,
- StorageService.RING_DELAY + REPLAY_INTERVAL,
- REPLAY_INTERVAL,
- TimeUnit.MILLISECONDS);
- }
-
- private void replayInitially()
- {
- // Initial run must take care of non-time-uuid batches as written by Version 1.2.
- convertOldBatchEntries();
-
- replayAllFailedBatches();
- }
-
- public static void shutdown() throws InterruptedException
- {
- batchlogTasks.shutdown();
- batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
- }
-
- public int countAllBatches()
- {
- String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
- UntypedResultSet results = executeInternal(query);
- if (results.isEmpty())
- return 0;
- return (int) results.one().getLong("count");
- }
-
- public long getTotalBatchesReplayed()
- {
- return totalBatchesReplayed;
- }
-
- public void forceBatchlogReplay() throws Exception
- {
- startBatchlogReplay().get();
- }
-
- public Future<?> startBatchlogReplay()
- {
- // If a replay is already in progress this request will be executed after it completes.
- return batchlogTasks.submit(this::replayAllFailedBatches);
- }
-
- void performInitialReplay() throws InterruptedException, ExecutionException
- {
- // Invokes initial replay. Used for testing only.
- batchlogTasks.submit(this::replayInitially).get();
- }
-
- public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
- {
- return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid)
- .clustering()
- .add("data", serializeMutations(mutations, version))
- .add("version", version)
- .build();
- }
-
- @VisibleForTesting
- static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
- {
- try (DataOutputBuffer buf = new DataOutputBuffer())
- {
- buf.writeInt(mutations.size());
- for (Mutation mutation : mutations)
- Mutation.serializer.serialize(mutation, buf, version);
- return buf.buffer();
- }
- catch (IOException e)
- {
- throw new AssertionError(); // cannot happen.
- }
- }
-
- private void replayAllFailedBatches()
- {
- logger.debug("Started replayAllFailedBatches");
-
- // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
- // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
- int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
- RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
-
- UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
- int pageSize = calculatePageSize();
- // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
- // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
- // token(id) > token(lastReplayedUuid) as part of the query.
- String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
- SystemKeyspace.NAME,
- SystemKeyspace.BATCHES);
- UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
- processBatchlogEntries(batches, pageSize, rateLimiter);
- lastReplayedUuid = limitUuid;
- logger.debug("Finished replayAllFailedBatches");
- }
-
- // read less rows (batches) per page if they are very large
- private static int calculatePageSize()
- {
- ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
- double averageRowSize = store.getMeanPartitionSize();
- if (averageRowSize <= 0)
- return DEFAULT_PAGE_SIZE;
-
- return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
- }
-
- private static void deleteBatch(UUID id)
- {
- Mutation mutation = new Mutation(
- PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
- UUIDType.instance.decompose(id),
- FBUtilities.timestampMicros(),
- FBUtilities.nowInSeconds()));
- mutation.apply();
- }
-
- private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
- {
- int positionInPage = 0;
- ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize);
-
- Set<InetAddress> hintedNodes = new HashSet<>();
- Set<UUID> replayedBatches = new HashSet<>();
-
- // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
- for (UntypedResultSet.Row row : batches)
- {
- UUID id = row.getUUID("id");
- int version = row.getInt("version");
- Batch batch = new Batch(id, row.getBytes("data"), version);
- try
- {
- if (batch.replay(rateLimiter, hintedNodes) > 0)
- {
- unfinishedBatches.add(batch);
- }
- else
- {
- deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
- ++totalBatchesReplayed;
- }
- }
- catch (IOException e)
- {
- logger.warn("Skipped batch replay of {} due to {}", id, e);
- deleteBatch(id);
- }
-
- if (++positionInPage == pageSize)
- {
- // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
- // finish processing the page before requesting the next row.
- finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
- positionInPage = 0;
- }
- }
-
- finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
-
- // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
- HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
-
- // once all generated hints are fsynced, actually delete the batches
- replayedBatches.forEach(BatchlogManager::deleteBatch);
- }
-
- private void finishAndClearBatches(ArrayList<Batch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
- {
- // schedule hints for timed out deliveries
- for (Batch batch : batches)
- {
- batch.finish(hintedNodes);
- replayedBatches.add(batch.id);
- }
-
- totalBatchesReplayed += batches.size();
- batches.clear();
- }
-
- public static long getBatchlogTimeout()
- {
- return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
- }
-
- private static class Batch
- {
- private final UUID id;
- private final long writtenAt;
- private final ByteBuffer data;
- private final int version;
-
- private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
-
- Batch(UUID id, ByteBuffer data, int version)
- {
- this.id = id;
- this.writtenAt = UUIDGen.unixTimestamp(id);
- this.data = data;
- this.version = version;
- }
-
- public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
- {
- logger.debug("Replaying batch {}", id);
-
- List<Mutation> mutations = replayingMutations();
-
- if (mutations.isEmpty())
- return 0;
-
- int gcgs = gcgs(mutations);
- if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
- return 0;
-
- replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
-
- rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
-
- return replayHandlers.size();
- }
-
- public void finish(Set<InetAddress> hintedNodes)
- {
- for (int i = 0; i < replayHandlers.size(); i++)
- {
- ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
- try
- {
- handler.get();
- }
- catch (WriteTimeoutException|WriteFailureException e)
- {
- logger.debug("Failed replaying a batched mutation to a node, will write a hint");
- logger.debug("Failure was : {}", e.getMessage());
- // writing hints for the rest to hints, starting from i
- writeHintsForUndeliveredEndpoints(i, hintedNodes);
- return;
- }
- }
- }
-
- private List<Mutation> replayingMutations() throws IOException
- {
- DataInputPlus in = new DataInputBuffer(data, true);
- int size = in.readInt();
- List<Mutation> mutations = new ArrayList<>(size);
- for (int i = 0; i < size; i++)
- {
- Mutation mutation = Mutation.serializer.deserialize(in, version);
-
- // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
- // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
- // truncated.
- for (UUID cfId : mutation.getColumnFamilyIds())
- if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
- mutation = mutation.without(cfId);
-
- if (!mutation.isEmpty())
- mutations.add(mutation);
- }
- return mutations;
- }
-
- private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
- {
- try
- {
- // Here we deserialize mutations 2nd time from byte buffer.
- // but this is ok, because timeout on batch direct delivery is rare
- // (it can happen only several seconds until node is marked dead)
- // so trading some cpu to keep less objects
- List<Mutation> replayingMutations = replayingMutations();
- for (int i = startFrom; i < replayHandlers.size(); i++)
- {
- Mutation undeliveredMutation = replayingMutations.get(i);
- int gcgs = gcgs(replayingMutations);
- ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
-
- if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs > FBUtilities.nowInSeconds() && handler != null)
- {
- hintedNodes.addAll(handler.undelivered);
- HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
- Hint.create(undeliveredMutation, writtenAt));
- }
- }
- }
- catch (IOException e)
- {
- logger.error("Cannot schedule hints for undelivered batch", e);
- }
- }
-
- private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
- long writtenAt,
- Set<InetAddress> hintedNodes)
- {
- List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
- for (Mutation mutation : mutations)
- {
- ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
- if (handler != null)
- handlers.add(handler);
- }
- return handlers;
- }
-
- /**
- * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
- * when a replica is down or a write request times out.
- *
- * @return direct delivery handler to wait on or null, if no live nodes found
- */
- private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
- long writtenAt,
- Set<InetAddress> hintedNodes)
- {
- Set<InetAddress> liveEndpoints = new HashSet<>();
- String ks = mutation.getKeyspaceName();
- Token tk = mutation.key().getToken();
-
- for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
- StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
- {
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- {
- mutation.apply();
- }
- else if (FailureDetector.instance.isAlive(endpoint))
- {
- liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
- }
- else
- {
- hintedNodes.add(endpoint);
- HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
- Hint.create(mutation, writtenAt));
- }
- }
-
- if (liveEndpoints.isEmpty())
- return null;
-
- ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
- MessageOut<Mutation> message = mutation.createMessage();
- for (InetAddress endpoint : liveEndpoints)
- MessagingService.instance().sendRR(message, endpoint, handler, false);
- return handler;
- }
-
- private static int gcgs(Collection<Mutation> mutations)
- {
- int gcgs = Integer.MAX_VALUE;
- for (Mutation mutation : mutations)
- gcgs = Math.min(gcgs, mutation.smallestGCGS());
- return gcgs;
- }
-
- /**
- * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
- * which we did not receive a successful reply.
- */
- private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
- {
- private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
- ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
- {
- super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
- undelivered.addAll(writeEndpoints);
- }
-
- @Override
- protected int totalBlockFor()
- {
- return this.naturalEndpoints.size();
- }
-
- @Override
- public void response(MessageIn<T> m)
- {
- boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
- assert removed;
- super.response(m);
- }
- }
- }
-
- @SuppressWarnings("deprecation")
- private static void convertOldBatchEntries()
- {
- logger.debug("Started convertOldBatchEntries");
-
- String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
- SystemKeyspace.NAME,
- SystemKeyspace.LEGACY_BATCHLOG);
- UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE);
- int convertedBatches = 0;
- for (UntypedResultSet.Row row : batches)
- {
- UUID id = row.getUUID("id");
- long timestamp = row.getLong("written_at");
- int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
- logger.debug("Converting mutation at " + timestamp);
-
- UUID newId = id;
- if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id))
- newId = UUIDGen.getTimeUUID(timestamp, convertedBatches);
- ++convertedBatches;
-
- Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches,
- FBUtilities.timestampMicros(),
- newId)
- .clustering()
- .add("data", row.getBytes("data"))
- .add("version", version)
- .build();
-
- addRow.apply();
- }
- if (convertedBatches > 0)
- Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
- // cleanup will be called after replay
- logger.debug("Finished convertOldBatchEntries");
- }
-
- public static class EndpointFilter
- {
- private final String localRack;
- private final Multimap<String, InetAddress> endpoints;
-
- public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
- {
- this.localRack = localRack;
- this.endpoints = endpoints;
- }
-
- /**
- * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
- */
- public Collection<InetAddress> filter()
- {
- // special case for single-node data centers
- if (endpoints.values().size() == 1)
- return endpoints.values();
-
- // strip out dead endpoints and localhost
- ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
- for (Map.Entry<String, InetAddress> entry : endpoints.entries())
- if (isValid(entry.getValue()))
- validated.put(entry.getKey(), entry.getValue());
-
- if (validated.size() <= 2)
- return validated.values();
-
- if (validated.size() - validated.get(localRack).size() >= 2)
- {
- // we have enough endpoints in other racks
- validated.removeAll(localRack);
- }
-
- if (validated.keySet().size() == 1)
- {
- // we have only 1 `other` rack
- Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
- return Lists.newArrayList(Iterables.limit(otherRack, 2));
- }
-
- // randomize which racks we pick from if more than 2 remaining
- Collection<String> racks;
- if (validated.keySet().size() == 2)
- {
- racks = validated.keySet();
- }
- else
- {
- racks = Lists.newArrayList(validated.keySet());
- Collections.shuffle((List<String>) racks);
- }
-
- // grab a random member of up to two racks
- List<InetAddress> result = new ArrayList<>(2);
- for (String rack : Iterables.limit(racks, 2))
- {
- List<InetAddress> rackMembers = validated.get(rack);
- result.add(rackMembers.get(getRandomInt(rackMembers.size())));
- }
-
- return result;
- }
-
- @VisibleForTesting
- protected boolean isValid(InetAddress input)
- {
- return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
- }
-
- @VisibleForTesting
- protected int getRandomInt(int bound)
- {
- return ThreadLocalRandom.current().nextInt(bound);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
deleted file mode 100644
index a688117..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-public interface BatchlogManagerMBean
-{
- /**
- * Counts all batches currently in the batchlog.
- *
- * @return total batch count
- */
- public int countAllBatches();
-
- /**
- * @return total count of batches replayed since node start
- */
- public long getTotalBatchesReplayed();
-
- /**
- * Forces batchlog replay. Returns immediately if replay is already in progress.
- */
- public void forceBatchlogReplay() throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d9ee38a..e349bfc 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -49,7 +49,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
{
public void run()
{
- MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6e78b0e..da7d13d 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -58,7 +58,7 @@ public class Mutation implements IMutation
public final long createdAt = System.currentTimeMillis();
public Mutation(String keyspaceName, DecoratedKey key)
{
- this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
+ this(keyspaceName, key, new HashMap<>());
}
public Mutation(PartitionUpdate update)
@@ -201,6 +201,11 @@ public class Mutation implements IMutation
ks.apply(this, ks.getMetadata().params.durableWrites);
}
+ public void apply(boolean durableWrites)
+ {
+ Keyspace.open(keyspaceName).apply(this, durableWrites);
+ }
+
public void applyUnsafe()
{
Keyspace.open(keyspaceName).apply(this, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 640e45f..d4670a2 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -18,10 +18,10 @@
package org.apache.cassandra.db;
import java.io.DataInputStream;
-import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.*;
@@ -29,31 +29,32 @@ import org.apache.cassandra.tracing.Tracing;
public class MutationVerbHandler implements IVerbHandler<Mutation>
{
- private static final boolean TEST_FAIL_WRITES = System.getProperty("cassandra.test.fail_writes", "false").equalsIgnoreCase("true");
-
public void doVerb(MessageIn<Mutation> message, int id) throws IOException
{
- // Check if there were any forwarding headers in this message
- byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
- InetAddress replyTo;
- if (from == null)
- {
- replyTo = message.from;
- byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
- if (forwardBytes != null)
- forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
- }
- else
- {
- replyTo = InetAddress.getByAddress(from);
- }
+ // Check if there were any forwarding headers in this message
+ byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
+ InetAddress replyTo;
+ if (from == null)
+ {
+ replyTo = message.from;
+ byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
+ if (forwardBytes != null)
+ forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
+ }
+ else
+ {
+ replyTo = InetAddress.getByAddress(from);
+ }
try
{
- message.payload.apply();
- WriteResponse response = new WriteResponse();
+ if (message.version < MessagingService.VERSION_30 && LegacyBatchlogMigrator.isLegacyBatchlogMutation(message.payload))
+ LegacyBatchlogMigrator.handleLegacyMutation(message.payload);
+ else
+ message.payload.apply();
+
Tracing.trace("Enqueuing response to {}", replyTo);
- MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo);
}
catch (WriteTimeoutException wto)
{
@@ -65,7 +66,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
* Older version (< 1.0) will not send this message at all, hence we don't
* need to check the version of the data.
*/
- private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
+ private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
{
try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 849ac70..2e499e7 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -26,7 +26,6 @@ public class ReadRepairVerbHandler implements IVerbHandler<Mutation>
public void doVerb(MessageIn<Mutation> message, int id)
{
message.payload.apply();
- WriteResponse response = new WriteResponse();
- MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index fb9eb48..cf8e14d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -113,7 +113,7 @@ public final class SystemKeyspace
"batches awaiting replay",
"CREATE TABLE %s ("
+ "id timeuuid,"
- + "data blob,"
+ + "mutations list<blob>,"
+ "version int,"
+ "PRIMARY KEY ((id)))")
.copy(new LocalPartitioner(TimeUUIDType.instance))