You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/10/27 21:36:57 UTC

[1/3] git commit: Add IRequestSink interface

Updated Branches:
  refs/heads/trunk 55a774875 -> 9fa54cf3b


Add IRequestSink interface

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6248


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b3fe5ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b3fe5ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b3fe5ee

Branch: refs/heads/trunk
Commit: 6b3fe5ee7ca333bf1d8cef1cb06e72f2b7ccef6a
Parents: 18260c5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Oct 27 23:28:46 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Oct 27 23:28:46 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/net/MessagingService.java  |  20 +++-
 .../apache/cassandra/net/sink/IMessageSink.java |  42 --------
 .../apache/cassandra/net/sink/SinkManager.java  |  68 -------------
 .../apache/cassandra/service/StorageProxy.java  |  17 +++-
 .../org/apache/cassandra/sink/IMessageSink.java |  42 ++++++++
 .../org/apache/cassandra/sink/IRequestSink.java |  32 ++++++
 .../org/apache/cassandra/sink/SinkManager.java  | 100 +++++++++++++++++++
 .../cassandra/repair/DifferencerTest.java       |   4 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   4 +-
 .../apache/cassandra/service/RemoveTest.java    |   2 +-
 11 files changed, 212 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 62c3f52..1052901 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 2.0.3
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)
  * cqlsh: fix LIST USERS output (CASSANDRA-6242)
+ * Add IRequestSink interface (CASSANDRA-6248)
 
 
 2.0.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b66c8a4..6696e87 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -53,7 +53,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
@@ -708,9 +708,13 @@ public final class MessagingService implements MessagingServiceMBean
         if (state != null)
             state.trace("Message received from {}", message.from);
 
+        Verb verb = message.verb;
         message = SinkManager.processInboundMessage(message, id);
         if (message == null)
+        {
+            incrementRejectedMessages(verb);
             return;
+        }
 
         Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
         TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
@@ -798,6 +802,20 @@ public final class MessagingService implements MessagingServiceMBean
         droppedMessages.get(verb).dropped.mark();
     }
 
+    /**
+     * Same as incrementDroppedMessages(), but allows non-droppable verbs. Called for IMessageSink-caused message drops.
+     */
+    private void incrementRejectedMessages(Verb verb)
+    {
+        DroppedMessageMetrics metrics = droppedMessages.get(verb);
+        if (metrics == null)
+        {
+            metrics = new DroppedMessageMetrics(verb);
+            droppedMessages.put(verb, metrics);
+        }
+        metrics.dropped.mark();
+    }
+
     private void logDroppedMessages()
     {
         boolean logTpstats = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/IMessageSink.java b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
deleted file mode 100644
index d6b6496..0000000
--- a/src/java/org/apache/cassandra/net/sink/IMessageSink.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net.sink;
-
-import java.net.InetAddress;
-
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-
-public interface IMessageSink
-{
-    /**
-     * Transform or drop an outgoing message
-     *
-     * @return null if the message is dropped, or the transformed message to send, which may be just
-     * the original message
-     */
-    public MessageOut handleMessage(MessageOut message, int id, InetAddress to);
-
-    /**
-     * Transform or drop an incoming message
-     *
-     * @return null if the message is dropped, or the transformed message to receive, which may be just
-     * the original message
-     */
-    public MessageIn handleMessage(MessageIn message, int id, InetAddress to);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/SinkManager.java b/src/java/org/apache/cassandra/net/sink/SinkManager.java
deleted file mode 100644
index 7b67afe..0000000
--- a/src/java/org/apache/cassandra/net/sink/SinkManager.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net.sink;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-
-public class SinkManager
-{
-    private static final List<IMessageSink> sinks = new ArrayList<IMessageSink>();
-
-    public static void add(IMessageSink ms)
-    {
-        sinks.add(ms);
-    }
-
-    public static void clear()
-    {
-        sinks.clear();
-    }
-
-    public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to)
-    {
-        if (sinks.isEmpty())
-            return message;
-
-        for (IMessageSink ms : sinks)
-        {
-            message = ms.handleMessage(message, id, to);
-            if (message == null)
-                return null;
-        }
-        return message;
-    }
-
-    public static MessageIn processInboundMessage(MessageIn message, int id)
-    {
-        if (sinks.isEmpty())
-            return message;
-
-        for (IMessageSink ms : sinks)
-        {
-            message = ms.handleMessage(message, id, null);
-            if (message == null)
-                return null;
-        }
-        return message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 6dd702b..52a2a47 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.*;
+import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
@@ -992,8 +993,12 @@ public class StorageProxy implements StorageProxyMBean
         {
             public void runMayThrow()
             {
-                rm.apply();
-                responseHandler.response(null);
+                IMutation processed = SinkManager.processWriteRequest(rm);
+                if (processed != null)
+                {
+                    processed.apply();
+                    responseHandler.response(null);
+                }
             }
         };
         StageManager.getStage(Stage.MUTATION).execute(runnable);
@@ -1104,8 +1109,12 @@ public class StorageProxy implements StorageProxyMBean
         {
             public void runMayThrow()
             {
-                assert mutation instanceof CounterMutation;
-                final CounterMutation cm = (CounterMutation) mutation;
+                IMutation processed = SinkManager.processWriteRequest(mutation);
+                if (processed == null)
+                    return;
+
+                assert processed instanceof CounterMutation;
+                final CounterMutation cm = (CounterMutation) processed;
 
                 // apply mutation
                 cm.apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/IMessageSink.java b/src/java/org/apache/cassandra/sink/IMessageSink.java
new file mode 100644
index 0000000..996e7ff
--- /dev/null
+++ b/src/java/org/apache/cassandra/sink/IMessageSink.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sink;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+
+public interface IMessageSink
+{
+    /**
+     * Transform or drop an outgoing message
+     *
+     * @return null if the message is dropped, or the transformed message to send, which may be just
+     * the original message
+     */
+    MessageOut handleMessage(MessageOut message, int id, InetAddress to);
+
+    /**
+     * Transform or drop an incoming message
+     *
+     * @return null if the message is dropped, or the transformed message to receive, which may be just
+     * the original message
+     */
+    MessageIn handleMessage(MessageIn message, int id, InetAddress to);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/IRequestSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/IRequestSink.java b/src/java/org/apache/cassandra/sink/IRequestSink.java
new file mode 100644
index 0000000..8d68ce8
--- /dev/null
+++ b/src/java/org/apache/cassandra/sink/IRequestSink.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sink;
+
+import org.apache.cassandra.db.IMutation;
+
+public interface IRequestSink
+{
+    /**
+     * Transform or drop a write request (represented by a RowMutation).
+     *
+     * @param mutation the RowMutation to be applied locally.
+     * @return null if the mutation is to be dropped, or the transformed mutation to apply, which may be just
+     * the original mutation.
+     */
+    IMutation handleWriteRequest(IMutation mutation);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/SinkManager.java b/src/java/org/apache/cassandra/sink/SinkManager.java
new file mode 100644
index 0000000..9b422dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/sink/SinkManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sink;
+
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+
+public class SinkManager
+{
+    private static final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
+    private static final Set<IRequestSink> requestSinks = new CopyOnWriteArraySet<>();
+
+    public static void add(IMessageSink ms)
+    {
+        messageSinks.add(ms);
+    }
+
+    public static void add(IRequestSink rs)
+    {
+        requestSinks.add(rs);
+    }
+
+    public static void remove(IMessageSink ms)
+    {
+        messageSinks.remove(ms);
+    }
+
+    public static void remove(IRequestSink rs)
+    {
+        requestSinks.remove(rs);
+    }
+
+    public static void clear()
+    {
+        messageSinks.clear();
+        requestSinks.clear();
+    }
+
+    public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to)
+    {
+        if (messageSinks.isEmpty())
+            return message;
+
+        for (IMessageSink ms : messageSinks)
+        {
+            message = ms.handleMessage(message, id, to);
+            if (message == null)
+                return null;
+        }
+        return message;
+    }
+
+    public static MessageIn processInboundMessage(MessageIn message, int id)
+    {
+        if (messageSinks.isEmpty())
+            return message;
+
+        for (IMessageSink ms : messageSinks)
+        {
+            message = ms.handleMessage(message, id, null);
+            if (message == null)
+                return null;
+        }
+        return message;
+    }
+
+    public static IMutation processWriteRequest(IMutation mutation)
+    {
+        if (requestSinks.isEmpty())
+            return mutation;
+
+        for (IRequestSink rs : requestSinks)
+        {
+            mutation = rs.handleWriteRequest(mutation);
+            if (mutation == null)
+                return null;
+        }
+        return mutation;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
index 3f259f2..b6dce40 100644
--- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java
+++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
@@ -33,8 +33,8 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.IMessageSink;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.IMessageSink;
+import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.utils.MerkleTree;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a6be1b1..9fa5d89 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -35,8 +35,8 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.IMessageSink;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.IMessageSink;
+import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.service.StorageService;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 1b3eb48..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;


[2/3] git commit: Update memtable size while flushing

Posted by al...@apache.org.
Update memtable size while flushing

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6249


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d280e970
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d280e970
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d280e970

Branch: refs/heads/trunk
Commit: d280e970e934447f42144cd8651ea678497a9785
Parents: 6b3fe5e
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Oct 27 23:34:10 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Oct 27 23:34:10 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ColumnFamily.java   |  8 ++++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  | 20 +++++++++++++++++---
 .../org/apache/cassandra/db/DataTracker.java    |  9 +++++++++
 src/java/org/apache/cassandra/db/Memtable.java  |  7 ++++++-
 5 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1052901..7e6ba95 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,7 @@
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)
  * cqlsh: fix LIST USERS output (CASSANDRA-6242)
  * Add IRequestSink interface (CASSANDRA-6248)
+ * Update memtable size while flushing (CASSANDRA-6249)
 
 
 2.0.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 4031ebc..7b5642a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -319,6 +319,14 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         return ObjectSizes.measureDeep(this);
     }
 
+    public long dataSize()
+    {
+        long size = 0;
+        for (Column column : this)
+            size += column.dataSize();
+        return size;
+    }
+
     public long maxTimestamp()
     {
         long maxTimestamp = deletionInfo().maxTimestamp();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 597ca53..4346224 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -884,11 +884,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return removeDeletedCF(cf, gcBefore);
     }
 
-    private static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+    private static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
         Iterator<Column> iter = cf.iterator();
         DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
         boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
+        long removedBytes = 0;
         while (iter.hasNext())
         {
             Column c = iter.next();
@@ -900,13 +901,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             {
                 iter.remove();
                 indexer.remove(c);
+                removedBytes += c.dataSize();
             }
         }
+        return removedBytes;
     }
 
-    public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
+    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
     {
-        removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
+        return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
     }
 
     // returns true if
@@ -1094,6 +1097,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return getMemtableDataSize() + indexManager.getTotalLiveSize();
     }
 
+    /**
+     * @return the live size of all the memtables (the current active one and pending flush).
+     */
+    public long getAllMemtablesLiveSize()
+    {
+        long size = 0;
+        for (Memtable mt : getDataTracker().getAllMemtables())
+            size += mt.getLiveSize();
+        return size;
+    }
+
     public int getMemtableSwitchCount()
     {
         return (int) metric.memtableSwitchCount.count();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 365d607..1c25f44 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -62,6 +62,15 @@ public class DataTracker
         return view.get().memtablesPendingFlush;
     }
 
+    /**
+     * @return the active memtable and all the memtables that are pending flush.
+     */
+    public Iterable<Memtable> getAllMemtables()
+    {
+        View snapshot = view.get();
+        return Iterables.concat(snapshot.memtablesPendingFlush, Collections.singleton(snapshot.memtable));
+    }
+
     public Set<SSTableReader> getSSTables()
     {
         return view.get().sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 9a8f810..12d36bf 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -233,6 +233,7 @@ public class Memtable
             private Iterator<Map.Entry<RowPosition, AtomicSortedColumns>> iter = stopAt.isMinimum(cfs.partitioner)
                                                                                ? rows.tailMap(startWith).entrySet().iterator()
                                                                                : rows.subMap(startWith, true, stopAt, true).entrySet().iterator();
+            private Map.Entry<RowPosition, AtomicSortedColumns> currentEntry;
 
             public boolean hasNext()
             {
@@ -242,6 +243,8 @@ public class Memtable
             public Map.Entry<DecoratedKey, AtomicSortedColumns> next()
             {
                 Map.Entry<RowPosition, AtomicSortedColumns> entry = iter.next();
+                // Store the reference to the current entry so that remove() can update the current size.
+                currentEntry = entry;
                 // Actual stored key should be true DecoratedKey
                 assert entry.getKey() instanceof DecoratedKey;
                 // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
@@ -251,6 +254,8 @@ public class Memtable
             public void remove()
             {
                 iter.remove();
+                currentSize.addAndGet(-currentEntry.getValue().dataSize());
+                currentEntry = null;
             }
         };
     }
@@ -355,7 +360,7 @@ public class Memtable
                         // the table has secondary indexes, or else the stale entries wouldn't be cleaned up during compaction,
                         // and will only be dropped during 2i query read-repair, if at all.
                         if (!cfs.indexManager.hasIndexes())
-                            ColumnFamilyStore.removeDeletedColumnsOnly(cf, Integer.MIN_VALUE);
+                            currentSize.addAndGet(-ColumnFamilyStore.removeDeletedColumnsOnly(cf, Integer.MIN_VALUE));
                     }
                     writer.append((DecoratedKey)entry.getKey(), cf);
                 }


[3/3] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fa54cf3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fa54cf3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fa54cf3

Branch: refs/heads/trunk
Commit: 9fa54cf3b3656c37decc18767ff198df5e243074
Parents: 55a7748 d280e97
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Oct 27 23:35:17 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Oct 27 23:35:17 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/db/ColumnFamily.java   |   8 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  20 +++-
 .../org/apache/cassandra/db/DataTracker.java    |   9 ++
 src/java/org/apache/cassandra/db/Memtable.java  |   7 +-
 .../apache/cassandra/net/MessagingService.java  |  20 +++-
 .../apache/cassandra/net/sink/IMessageSink.java |  42 --------
 .../apache/cassandra/net/sink/SinkManager.java  |  68 -------------
 .../apache/cassandra/service/StorageProxy.java  |  17 +++-
 .../org/apache/cassandra/sink/IMessageSink.java |  42 ++++++++
 .../org/apache/cassandra/sink/IRequestSink.java |  32 ++++++
 .../org/apache/cassandra/sink/SinkManager.java  | 100 +++++++++++++++++++
 .../cassandra/repair/DifferencerTest.java       |   4 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   4 +-
 .../apache/cassandra/service/RemoveTest.java    |   2 +-
 15 files changed, 253 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fa54cf3/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fa54cf3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fa54cf3/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fa54cf3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fa54cf3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------