You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/09/10 20:16:56 UTC

git commit: flesh out BatchlogManagerMBean patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4635

Updated Branches:
  refs/heads/trunk 02769d5b6 -> b2dcd9406


flesh out BatchlogManagerMBean
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4635


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2dcd940
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2dcd940
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2dcd940

Branch: refs/heads/trunk
Commit: b2dcd9406b779a2ef28041263a3580156b015dfd
Parents: 02769d5
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Sep 10 13:15:21 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Sep 10 13:15:21 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 .../org/apache/cassandra/config/CFMetaData.java    |    1 -
 .../org/apache/cassandra/db/BatchlogManager.java   |  107 +++++++++------
 .../apache/cassandra/db/BatchlogManagerMBean.java  |   16 ++
 4 files changed, 82 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 896b8cc..95a8b18 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,5 @@
 1.2-beta1
- * add atomic_batch_mutate (CASSANDRA-4542)
+ * add atomic_batch_mutate (CASSANDRA-4542, -4635)
  * increase default max_hint_window_in_ms to 3h (CASSANDRA-4632)
  * include message initiation time to replicas so they can more
    accurately drop timed-out requests (CASSANDRA-2858)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 9ee684c..4e29fc7 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -194,7 +194,6 @@ public final class CFMetaData
 
     public static final CFMetaData BatchlogCF = compile(16, "CREATE TABLE " + SystemTable.BATCHLOG_CF + " ("
                                                             + "id uuid PRIMARY KEY,"
-                                                            + "coordinator inet,"
                                                             + "written_at timestamp,"
                                                             + "data blob"
                                                             + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 2ae9361..ded1ca4 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -23,6 +23,8 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -32,17 +34,16 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.InetAddressType;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
@@ -54,17 +55,18 @@ public class BatchlogManager implements BatchlogManagerMBean
 {
     private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
     private static final int VERSION = MessagingService.VERSION_12;
-    private static final long TIMEOUT = 2 * DatabaseDescriptor.getRpcTimeout();
+    private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout();
 
-    private static final ByteBuffer COORDINATOR = columnName("coordinator");
     private static final ByteBuffer WRITTEN_AT = columnName("written_at");
     private static final ByteBuffer DATA = columnName("data");
-    private static final SortedSet<ByteBuffer> META = ImmutableSortedSet.of(COORDINATOR, WRITTEN_AT);
 
     private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
 
     public static final BatchlogManager instance = new BatchlogManager();
 
+    private final AtomicLong totalBatchesReplayed = new AtomicLong();
+    private final AtomicBoolean isReplaying = new AtomicBoolean();
+
     public void start()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -90,15 +92,43 @@ public class BatchlogManager implements BatchlogManagerMBean
                                                             TimeUnit.MILLISECONDS);
     }
 
+    public int countAllBatches()
+    {
+        int count = 0;
+
+        for (Row row : getRangeSlice(new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of())))
+        {
+            if (row.cf != null && !row.cf.isMarkedForDelete())
+                count++;
+        }
+
+        return count;
+    }
+
+    public long getTotalBatchesReplayed()
+    {
+        return totalBatchesReplayed.longValue();
+    }
+
+    public void forceBatchlogReplay()
+    {
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                replayAllFailedBatches();
+            }
+        };
+        StorageService.optionalTasks.execute(runnable);
+    }
+
     public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
     {
         long timestamp = FBUtilities.timestampMicros();
-        ByteBuffer coordinator = InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress());
         ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
         ByteBuffer data = serializeRowMutations(mutations);
 
         ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCF);
-        cf.addColumn(new Column(COORDINATOR, coordinator, timestamp));
         cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
         cf.addColumn(new Column(DATA, data, timestamp));
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
@@ -126,55 +156,38 @@ public class BatchlogManager implements BatchlogManagerMBean
         return ByteBuffer.wrap(bos.toByteArray());
     }
 
-    private static void replayAllFailedBatches()
+    private void replayAllFailedBatches()
     {
-        if (logger.isDebugEnabled())
-            logger.debug("Started replayAllFailedBatches");
-
-        ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-
-        if (store.isEmpty())
+        if (!isReplaying.compareAndSet(false, true))
             return;
 
-        IPartitioner partitioner = StorageService.getPartitioner();
-        RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
-        AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
-
-        List<Row> rows = store.getRangeSlice(null, range, Integer.MAX_VALUE, new NamesQueryFilter(META), null);
-
-        for (Row row : rows)
+        try
         {
-            if (row.cf.isMarkedForDelete())
-                continue;
-
-            IColumn coordinatorColumn = row.cf.getColumn(COORDINATOR);
-            IColumn writtenAtColumn = row.cf.getColumn(WRITTEN_AT);
+            logger.debug("Started replayAllFailedBatches");
 
-            if (coordinatorColumn == null || writtenAtColumn == null)
+            for (Row row : getRangeSlice(new NamesQueryFilter(WRITTEN_AT)))
             {
-                replayBatch(row.key);
-                continue;
-            }
+                if (row.cf == null || row.cf.isMarkedForDelete())
+                    continue;
 
-            InetAddress coordinator = InetAddressType.instance.compose(coordinatorColumn.value());
-            long writtenAt = LongType.instance.compose(writtenAtColumn.value());
-            // if the batch is new and its coordinator is alive - give it a chance to complete naturally.
-            if (System.currentTimeMillis() < writtenAt + TIMEOUT && FailureDetector.instance.isAlive(coordinator))
-                continue;
-
-            replayBatch(row.key);
+                IColumn writtenAt = row.cf.getColumn(WRITTEN_AT);
+                if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value()) + TIMEOUT)
+                    replayBatch(row.key);
+            }
+        }
+        finally
+        {
+            isReplaying.set(false);
         }
 
-        if (logger.isDebugEnabled())
-            logger.debug("Finished replayAllFailedBatches");
+        logger.debug("Finished replayAllFailedBatches");
     }
 
-    private static void replayBatch(DecoratedKey key)
+    private void replayBatch(DecoratedKey key)
     {
         UUID uuid = UUIDType.instance.compose(key.key);
 
-        if (logger.isDebugEnabled())
-            logger.debug("Replaying batch {}", uuid);
+        logger.debug("Replaying batch {}", uuid);
 
         ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
         QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(SystemTable.BATCHLOG_CF), DATA);
@@ -195,6 +208,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         }
 
         deleteBatch(key);
+        totalBatchesReplayed.incrementAndGet();
     }
 
     private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException
@@ -228,4 +242,13 @@ public class BatchlogManager implements BatchlogManagerMBean
         ByteBuffer raw = UTF8Type.instance.decompose(name);
         return CFMetaData.BatchlogCF.getCfDef().getColumnNameBuilder().add(raw).build();
     }
+
+    private static List<Row> getRangeSlice(IFilter columnFilter)
+    {
+        ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+        IPartitioner partitioner = StorageService.getPartitioner();
+        RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
+        AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
+        return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2dcd940/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
index 0322b21..2e60ba4 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
@@ -19,4 +19,20 @@ package org.apache.cassandra.db;
 
 public interface BatchlogManagerMBean
 {
+    /**
+     * Counts all batches currently in the batchlog.
+     *
+     * @return total batch count
+     */
+    public int countAllBatches();
+
+    /**
+     * @return total count of batches replayed since node start
+     */
+    public long getTotalBatchesReplayed();
+
+    /**
+     * Forces batchlog replay. Returns immediately if replay is already in progress.
+     */
+    public void forceBatchlogReplay();
 }