You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/10/27 21:30:25 UTC
git commit: Add IRequestSink interface
Updated Branches:
refs/heads/cassandra-2.0 18260c5f2 -> 6b3fe5ee7
Add IRequestSink interface
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6248
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b3fe5ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b3fe5ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b3fe5ee
Branch: refs/heads/cassandra-2.0
Commit: 6b3fe5ee7ca333bf1d8cef1cb06e72f2b7ccef6a
Parents: 18260c5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Oct 27 23:28:46 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Oct 27 23:28:46 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 20 +++-
.../apache/cassandra/net/sink/IMessageSink.java | 42 --------
.../apache/cassandra/net/sink/SinkManager.java | 68 -------------
.../apache/cassandra/service/StorageProxy.java | 17 +++-
.../org/apache/cassandra/sink/IMessageSink.java | 42 ++++++++
.../org/apache/cassandra/sink/IRequestSink.java | 32 ++++++
.../org/apache/cassandra/sink/SinkManager.java | 100 +++++++++++++++++++
.../cassandra/repair/DifferencerTest.java | 4 +-
.../apache/cassandra/repair/ValidatorTest.java | 4 +-
.../apache/cassandra/service/RemoveTest.java | 2 +-
11 files changed, 212 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 62c3f52..1052901 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
2.0.3
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
* cqlsh: fix LIST USERS output (CASSANDRA-6242)
+ * Add IRequestSink interface (CASSANDRA-6248)
2.0.2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b66c8a4..6696e87 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -53,7 +53,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.SinkManager;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
@@ -708,9 +708,13 @@ public final class MessagingService implements MessagingServiceMBean
if (state != null)
state.trace("Message received from {}", message.from);
+ Verb verb = message.verb;
message = SinkManager.processInboundMessage(message, id);
if (message == null)
+ {
+ incrementRejectedMessages(verb);
return;
+ }
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
@@ -798,6 +802,20 @@ public final class MessagingService implements MessagingServiceMBean
droppedMessages.get(verb).dropped.mark();
}
+ /**
+ * Same as incrementDroppedMessages(), but allows non-droppable verbs. Called for IMessageSink-caused message drops.
+ */
+ private void incrementRejectedMessages(Verb verb)
+ {
+ DroppedMessageMetrics metrics = droppedMessages.get(verb);
+ if (metrics == null)
+ {
+ metrics = new DroppedMessageMetrics(verb);
+ droppedMessages.put(verb, metrics);
+ }
+ metrics.dropped.mark();
+ }
+
private void logDroppedMessages()
{
boolean logTpstats = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/IMessageSink.java b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
deleted file mode 100644
index d6b6496..0000000
--- a/src/java/org/apache/cassandra/net/sink/IMessageSink.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net.sink;
-
-import java.net.InetAddress;
-
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-
-public interface IMessageSink
-{
- /**
- * Transform or drop an outgoing message
- *
- * @return null if the message is dropped, or the transformed message to send, which may be just
- * the original message
- */
- public MessageOut handleMessage(MessageOut message, int id, InetAddress to);
-
- /**
- * Transform or drop an incoming message
- *
- * @return null if the message is dropped, or the transformed message to receive, which may be just
- * the original message
- */
- public MessageIn handleMessage(MessageIn message, int id, InetAddress to);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/SinkManager.java b/src/java/org/apache/cassandra/net/sink/SinkManager.java
deleted file mode 100644
index 7b67afe..0000000
--- a/src/java/org/apache/cassandra/net/sink/SinkManager.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net.sink;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-
-public class SinkManager
-{
- private static final List<IMessageSink> sinks = new ArrayList<IMessageSink>();
-
- public static void add(IMessageSink ms)
- {
- sinks.add(ms);
- }
-
- public static void clear()
- {
- sinks.clear();
- }
-
- public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to)
- {
- if (sinks.isEmpty())
- return message;
-
- for (IMessageSink ms : sinks)
- {
- message = ms.handleMessage(message, id, to);
- if (message == null)
- return null;
- }
- return message;
- }
-
- public static MessageIn processInboundMessage(MessageIn message, int id)
- {
- if (sinks.isEmpty())
- return message;
-
- for (IMessageSink ms : sinks)
- {
- message = ms.handleMessage(message, id, null);
- if (message == null)
- return null;
- }
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 6dd702b..52a2a47 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.paxos.*;
+import org.apache.cassandra.sink.SinkManager;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.*;
@@ -992,8 +993,12 @@ public class StorageProxy implements StorageProxyMBean
{
public void runMayThrow()
{
- rm.apply();
- responseHandler.response(null);
+ IMutation processed = SinkManager.processWriteRequest(rm);
+ if (processed != null)
+ {
+ processed.apply();
+ responseHandler.response(null);
+ }
}
};
StageManager.getStage(Stage.MUTATION).execute(runnable);
@@ -1104,8 +1109,12 @@ public class StorageProxy implements StorageProxyMBean
{
public void runMayThrow()
{
- assert mutation instanceof CounterMutation;
- final CounterMutation cm = (CounterMutation) mutation;
+ IMutation processed = SinkManager.processWriteRequest(mutation);
+ if (processed == null)
+ return;
+
+ assert processed instanceof CounterMutation;
+ final CounterMutation cm = (CounterMutation) processed;
// apply mutation
cm.apply();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/IMessageSink.java b/src/java/org/apache/cassandra/sink/IMessageSink.java
new file mode 100644
index 0000000..996e7ff
--- /dev/null
+++ b/src/java/org/apache/cassandra/sink/IMessageSink.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sink;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+
+public interface IMessageSink
+{
+ /**
+ * Transform or drop an outgoing message
+ *
+ * @return null if the message is dropped, or the transformed message to send, which may be just
+ * the original message
+ */
+ MessageOut handleMessage(MessageOut message, int id, InetAddress to);
+
+ /**
+ * Transform or drop an incoming message
+ *
+ * @return null if the message is dropped, or the transformed message to receive, which may be just
+ * the original message
+ */
+ MessageIn handleMessage(MessageIn message, int id, InetAddress to);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/IRequestSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/IRequestSink.java b/src/java/org/apache/cassandra/sink/IRequestSink.java
new file mode 100644
index 0000000..8d68ce8
--- /dev/null
+++ b/src/java/org/apache/cassandra/sink/IRequestSink.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sink;
+
+import org.apache.cassandra.db.IMutation;
+
+public interface IRequestSink
+{
+ /**
+ * Transform or drop a write request (represented by a RowMutation).
+ *
+ * @param mutation the RowMutation to be applied locally.
+ * @return null if the mutation is to be dropped, or the transformed mutation to apply, which may be just
+ * the original mutation.
+ */
+ IMutation handleWriteRequest(IMutation mutation);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/SinkManager.java b/src/java/org/apache/cassandra/sink/SinkManager.java
new file mode 100644
index 0000000..9b422dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/sink/SinkManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sink;
+
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+
+public class SinkManager
+{
+ private static final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
+ private static final Set<IRequestSink> requestSinks = new CopyOnWriteArraySet<>();
+
+ public static void add(IMessageSink ms)
+ {
+ messageSinks.add(ms);
+ }
+
+ public static void add(IRequestSink rs)
+ {
+ requestSinks.add(rs);
+ }
+
+ public static void remove(IMessageSink ms)
+ {
+ messageSinks.remove(ms);
+ }
+
+ public static void remove(IRequestSink rs)
+ {
+ requestSinks.remove(rs);
+ }
+
+ public static void clear()
+ {
+ messageSinks.clear();
+ requestSinks.clear();
+ }
+
+ public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to)
+ {
+ if (messageSinks.isEmpty())
+ return message;
+
+ for (IMessageSink ms : messageSinks)
+ {
+ message = ms.handleMessage(message, id, to);
+ if (message == null)
+ return null;
+ }
+ return message;
+ }
+
+ public static MessageIn processInboundMessage(MessageIn message, int id)
+ {
+ if (messageSinks.isEmpty())
+ return message;
+
+ for (IMessageSink ms : messageSinks)
+ {
+ message = ms.handleMessage(message, id, null);
+ if (message == null)
+ return null;
+ }
+ return message;
+ }
+
+ public static IMutation processWriteRequest(IMutation mutation)
+ {
+ if (requestSinks.isEmpty())
+ return mutation;
+
+ for (IRequestSink rs : requestSinks)
+ {
+ mutation = rs.handleWriteRequest(mutation);
+ if (mutation == null)
+ return null;
+ }
+ return mutation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
index 3f259f2..b6dce40 100644
--- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java
+++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
@@ -33,8 +33,8 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.IMessageSink;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.IMessageSink;
+import org.apache.cassandra.sink.SinkManager;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.utils.MerkleTree;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a6be1b1..9fa5d89 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -35,8 +35,8 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.IMessageSink;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.IMessageSink;
+import org.apache.cassandra.sink.SinkManager;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.service.StorageService;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 1b3eb48..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.sink.SinkManager;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;