You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/13 18:58:49 UTC
svn commit: r985283 - in /cassandra/branches/cassandra-0.6: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/net/
Author: jbellis
Date: Fri Aug 13 16:58:49 2010
New Revision: 985283
URL: http://svn.apache.org/viewvc?rev=985283&view=rev
Log:
remove message deserialization stage, and uncap read/write stages. patch by jbellis; reviewed by Stu Hood for CASSANDRA-1358
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Aug 13 16:58:49 2010
@@ -11,6 +11,8 @@
initialization (CASSANDRA-1377)
* fix errors in hard-coded bloom filter optKPerBucket by computing it
algorithmically (CASSANDRA-1220
+ * remove message deserialization stage, and uncap read/write stages
+ so slow reads/writes don't block gossip processing (CASSANDRA-1358)
0.6.4
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java Fri Aug 13 16:58:49 2010
@@ -70,7 +70,7 @@ public class StageManager
numThreads,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+ new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(name));
}
@@ -82,7 +82,7 @@ public class StageManager
numThreads,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+ new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(name));
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Aug 13 16:58:49 2010
@@ -156,8 +156,6 @@ public class DatabaseDescriptor
throw new RuntimeException("Cannot locate " + STORAGE_CONF_FILE + " via storage-config system property or classpath lookup.");
}
- private static int stageQueueSize_ = 4096;
-
static
{
try
@@ -1114,11 +1112,6 @@ public class DatabaseDescriptor
return getCFMetaData(tableName, cfName).subcolumnComparator;
}
- public static int getStageQueueSize()
- {
- return stageQueueSize_;
- }
-
/**
* @return The absolute number of keys that should be cached per table.
*/
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Fri Aug 13 16:58:49 2010
@@ -26,6 +26,7 @@ import java.net.Socket;
import org.apache.log4j.Logger;
+import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.streaming.IncomingStreamReader;
public class IncomingTcpConnection extends Thread
@@ -70,7 +71,9 @@ public class IncomingTcpConnection exten
int size = input.readInt();
byte[] contentBytes = new byte[size];
input.readFully(contentBytes);
- MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+
+ Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
+ MessagingService.receive(message);
}
}
catch (EOFException e)
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Fri Aug 13 16:58:49 2010
@@ -20,13 +20,16 @@ package org.apache.cassandra.net;
import org.apache.log4j.Logger;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.StorageService;
public class MessageDeliveryTask implements Runnable
{
+ private static final Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);
+
private Message message_;
- private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);
-
+ private final long constructionTime_ = System.currentTimeMillis();
+
public MessageDeliveryTask(Message message)
{
message_ = message;
@@ -34,6 +37,12 @@ public class MessageDeliveryTask impleme
public void run()
{
+ if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
+ {
+ MessagingService.incrementDroppedMessages();
+ return;
+ }
+
StorageService.Verb verb = message_.getVerb();
IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb);
assert verbHandler != null : "unknown verb " + verb;
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Fri Aug 13 16:58:49 2010
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-class MessageDeserializationTask extends WrappedRunnable
-{
- private static Logger logger = Logger.getLogger(MessageDeserializationTask.class);
-
- private final ByteArrayInputStream bytes;
- private final long constructionTime = System.currentTimeMillis();
-
- MessageDeserializationTask(ByteArrayInputStream bytes)
- {
- this.bytes = bytes;
- }
-
- public void runMayThrow() throws IOException
- {
- if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
- {
- MessagingService.incrementDroppedMessages();
- return;
- }
-
- Message message = Message.serializer().deserialize(new DataInputStream(bytes));
- message = SinkManager.processServerMessageSink(message);
- MessagingService.receive(message);
- }
-}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Fri Aug 13 16:58:49 2010
@@ -18,21 +18,6 @@
package org.apache.cassandra.net;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.IFailureDetectionEventListener;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.io.SerializerType;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.SimpleCondition;
-import org.apache.log4j.Logger;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
@@ -47,12 +32,24 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
public class MessagingService
{
private static int version_ = 1;
@@ -69,8 +66,8 @@ public class MessagingService
/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
- /* Thread pool to handle deserialization of messages read from the socket. */
- private static ExecutorService messageDeserializerExecutor_;
+ /* Thread pool to handle messages without a specialized stage */
+ private static ExecutorService defaultExecutor_;
/* Thread pool to handle messaging write activities */
private static ExecutorService streamExecutor_;
@@ -105,13 +102,7 @@ public class MessagingService
callbackMap_ = new ExpiringMap<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
- // read executor puts messages to deserialize on this.
- messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
- Runtime.getRuntime().availableProcessors(),
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
+ defaultExecutor_ = new JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
TimerTask logDropped = new TimerTask()
@@ -354,8 +345,8 @@ public class MessagingService
/** blocks until the processing pools are empty and done. */
public static void waitFor() throws InterruptedException
{
- while (!messageDeserializerExecutor_.isTerminated())
- messageDeserializerExecutor_.awaitTermination(5, TimeUnit.SECONDS);
+ while (!defaultExecutor_.isTerminated())
+ defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS);
while (!streamExecutor_.isTerminated())
streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
}
@@ -373,7 +364,7 @@ public class MessagingService
throw new IOError(e);
}
- messageDeserializerExecutor_.shutdownNow();
+ defaultExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
/* shut down the cachetables */
@@ -385,14 +376,16 @@ public class MessagingService
public static void receive(Message message)
{
- Runnable runnable = new MessageDeliveryTask(message);
+ message = SinkManager.processServerMessageSink(message);
+ Runnable runnable = new MessageDeliveryTask(message);
ExecutorService stage = StageManager.getStage(message.getMessageType());
+
if (stage == null)
{
if (logger_.isDebugEnabled())
logger_.debug("Running " + message.getMessageType() + " on default stage");
- messageDeserializerExecutor_.execute(runnable);
+ defaultExecutor_.execute(runnable);
}
else
{
@@ -425,11 +418,6 @@ public class MessagingService
return taskCompletionMap_.getAge(key);
}
- public static ExecutorService getDeserializationExecutor()
- {
- return messageDeserializerExecutor_;
- }
-
public static void validateMagic(int magic) throws IOException
{
if (magic != PROTOCOL_MAGIC)