You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/09/10 20:16:56 UTC
git commit: flesh out BatchlogManagerMBean patch by Aleksey Yeschenko;
reviewed by jbellis for CASSANDRA-4635
Updated Branches:
refs/heads/trunk 02769d5b6 -> b2dcd9406
flesh out BatchlogManagerMBean
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4635
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2dcd940
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2dcd940
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2dcd940
Branch: refs/heads/trunk
Commit: b2dcd9406b779a2ef28041263a3580156b015dfd
Parents: 02769d5
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Sep 10 13:15:21 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Sep 10 13:15:21 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../org/apache/cassandra/config/CFMetaData.java | 1 -
.../org/apache/cassandra/db/BatchlogManager.java | 107 +++++++++------
.../apache/cassandra/db/BatchlogManagerMBean.java | 16 ++
4 files changed, 82 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 896b8cc..95a8b18 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,5 @@
1.2-beta1
- * add atomic_batch_mutate (CASSANDRA-4542)
+ * add atomic_batch_mutate (CASSANDRA-4542, -4635)
* increase default max_hint_window_in_ms to 3h (CASSANDRA-4632)
* include message initiation time to replicas so they can more
accurately drop timed-out requests (CASSANDRA-2858)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 9ee684c..4e29fc7 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -194,7 +194,6 @@ public final class CFMetaData
public static final CFMetaData BatchlogCF = compile(16, "CREATE TABLE " + SystemTable.BATCHLOG_CF + " ("
+ "id uuid PRIMARY KEY,"
- + "coordinator inet,"
+ "written_at timestamp,"
+ "data blob"
+ ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 2ae9361..ded1ca4 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -23,6 +23,8 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -32,17 +34,16 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.InetAddressType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
@@ -54,17 +55,18 @@ public class BatchlogManager implements BatchlogManagerMBean
{
private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
private static final int VERSION = MessagingService.VERSION_12;
- private static final long TIMEOUT = 2 * DatabaseDescriptor.getRpcTimeout();
+ private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout();
- private static final ByteBuffer COORDINATOR = columnName("coordinator");
private static final ByteBuffer WRITTEN_AT = columnName("written_at");
private static final ByteBuffer DATA = columnName("data");
- private static final SortedSet<ByteBuffer> META = ImmutableSortedSet.of(COORDINATOR, WRITTEN_AT);
private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
public static final BatchlogManager instance = new BatchlogManager();
+ private final AtomicLong totalBatchesReplayed = new AtomicLong();
+ private final AtomicBoolean isReplaying = new AtomicBoolean();
+
public void start()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -90,15 +92,43 @@ public class BatchlogManager implements BatchlogManagerMBean
TimeUnit.MILLISECONDS);
}
+ public int countAllBatches()
+ {
+ int count = 0;
+
+ for (Row row : getRangeSlice(new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of())))
+ {
+ if (row.cf != null && !row.cf.isMarkedForDelete())
+ count++;
+ }
+
+ return count;
+ }
+
+ public long getTotalBatchesReplayed()
+ {
+ return totalBatchesReplayed.longValue();
+ }
+
+ public void forceBatchlogReplay()
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ replayAllFailedBatches();
+ }
+ };
+ StorageService.optionalTasks.execute(runnable);
+ }
+
public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
{
long timestamp = FBUtilities.timestampMicros();
- ByteBuffer coordinator = InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress());
ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
ByteBuffer data = serializeRowMutations(mutations);
ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCF);
- cf.addColumn(new Column(COORDINATOR, coordinator, timestamp));
cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
cf.addColumn(new Column(DATA, data, timestamp));
RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
@@ -126,55 +156,38 @@ public class BatchlogManager implements BatchlogManagerMBean
return ByteBuffer.wrap(bos.toByteArray());
}
- private static void replayAllFailedBatches()
+ private void replayAllFailedBatches()
{
- if (logger.isDebugEnabled())
- logger.debug("Started replayAllFailedBatches");
-
- ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-
- if (store.isEmpty())
+ if (!isReplaying.compareAndSet(false, true))
return;
- IPartitioner partitioner = StorageService.getPartitioner();
- RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
- AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
-
- List<Row> rows = store.getRangeSlice(null, range, Integer.MAX_VALUE, new NamesQueryFilter(META), null);
-
- for (Row row : rows)
+ try
{
- if (row.cf.isMarkedForDelete())
- continue;
-
- IColumn coordinatorColumn = row.cf.getColumn(COORDINATOR);
- IColumn writtenAtColumn = row.cf.getColumn(WRITTEN_AT);
+ logger.debug("Started replayAllFailedBatches");
- if (coordinatorColumn == null || writtenAtColumn == null)
+ for (Row row : getRangeSlice(new NamesQueryFilter(WRITTEN_AT)))
{
- replayBatch(row.key);
- continue;
- }
+ if (row.cf == null || row.cf.isMarkedForDelete())
+ continue;
- InetAddress coordinator = InetAddressType.instance.compose(coordinatorColumn.value());
- long writtenAt = LongType.instance.compose(writtenAtColumn.value());
- // if the batch is new and its coordinator is alive - give it a chance to complete naturally.
- if (System.currentTimeMillis() < writtenAt + TIMEOUT && FailureDetector.instance.isAlive(coordinator))
- continue;
-
- replayBatch(row.key);
+ IColumn writtenAt = row.cf.getColumn(WRITTEN_AT);
+ if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value()) + TIMEOUT)
+ replayBatch(row.key);
+ }
+ }
+ finally
+ {
+ isReplaying.set(false);
}
- if (logger.isDebugEnabled())
- logger.debug("Finished replayAllFailedBatches");
+ logger.debug("Finished replayAllFailedBatches");
}
- private static void replayBatch(DecoratedKey key)
+ private void replayBatch(DecoratedKey key)
{
UUID uuid = UUIDType.instance.compose(key.key);
- if (logger.isDebugEnabled())
- logger.debug("Replaying batch {}", uuid);
+ logger.debug("Replaying batch {}", uuid);
ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(SystemTable.BATCHLOG_CF), DATA);
@@ -195,6 +208,7 @@ public class BatchlogManager implements BatchlogManagerMBean
}
deleteBatch(key);
+ totalBatchesReplayed.incrementAndGet();
}
private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException
@@ -228,4 +242,13 @@ public class BatchlogManager implements BatchlogManagerMBean
ByteBuffer raw = UTF8Type.instance.decompose(name);
return CFMetaData.BatchlogCF.getCfDef().getColumnNameBuilder().add(raw).build();
}
+
+ private static List<Row> getRangeSlice(IFilter columnFilter)
+ {
+ ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+ IPartitioner partitioner = StorageService.getPartitioner();
+ RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
+ AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
+ return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
index 0322b21..2e60ba4 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
@@ -19,4 +19,20 @@ package org.apache.cassandra.db;
public interface BatchlogManagerMBean
{
+ /**
+ * Counts all batches currently in the batchlog.
+ *
+ * @return total batch count
+ */
+ public int countAllBatches();
+
+ /**
+ * @return total count of batches replayed since node start
+ */
+ public long getTotalBatchesReplayed();
+
+ /**
+ * Forces batchlog replay. Returns immediately if replay is already in progress.
+ */
+ public void forceBatchlogReplay();
}