You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/12 00:36:38 UTC
svn commit: r1057930 - in /cassandra/branches/cassandra-0.7:
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/streaming/
test/unit/org/apache/cassandra/service/
Author: jbellis
Date: Tue Jan 11 23:36:37 2011
New Revision: 1057930
URL: http://svn.apache.org/viewvc?rev=1057930&view=rev
Log:
convert MessagingService into a true singleton
patch by Folke Behrens; reviewed by jbellis for CASSANDRA-1959
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java Tue Jan 11 23:36:37 2011
@@ -95,7 +95,7 @@ class AsyncResult implements IAsyncResul
lock.unlock();
}
- MessagingService.removeRegisteredCallback(response.getMessageId());
+ MessagingService.instance().removeRegisteredCallback(response.getMessageId());
}
public InetAddress getFrom()
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jan 11 23:36:37 2011
@@ -88,7 +88,7 @@ public class IncomingTcpConnection exten
input.readFully(contentBytes);
Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
- MessagingService.receive(message);
+ MessagingService.instance().receive(message);
}
// prepare to read the next message
MessagingService.validateMagic(input.readInt());
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Tue Jan 11 23:36:37 2011
@@ -50,7 +50,7 @@ public class MessageDeliveryTask impleme
case REQUEST_RESPONSE:
if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
{
- MessagingService.incrementDroppedMessages(verb);
+ MessagingService.instance().incrementDroppedMessages(verb);
return;
}
break;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Jan 11 23:36:37 2011
@@ -61,36 +61,35 @@ import org.apache.cassandra.utils.Simple
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
-public class MessagingService implements MessagingServiceMBean, ILatencyPublisher
+public final class MessagingService implements MessagingServiceMBean, ILatencyPublisher
{
- private static int version_ = 1;
+ private static final int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
- private static SerializerType serializerType_ = SerializerType.BINARY;
+ private SerializerType serializerType_ = SerializerType.BINARY;
/** we preface every message with this number so the recipient can validate the sender is sane */
- public static final int PROTOCOL_MAGIC = 0xCA552DFA;
+ private static final int PROTOCOL_MAGIC = 0xCA552DFA;
/* This records all the results mapped by message Id */
- private static ExpiringMap<String, IMessageCallback> callbacks;
- private static ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
+ private final ExpiringMap<String, IMessageCallback> callbacks;
+ private final ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
/* Lookup table for registering message handlers based on the verb. */
- private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
+ private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
/* Thread pool to handle messaging write activities */
- private static ExecutorService streamExecutor_;
+ private final ExecutorService streamExecutor_;
- private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
+ private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
- private static Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
- private static int LOG_DROPPED_INTERVAL_IN_MS = 5000;
+ private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
+ private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
private SocketThread socketThread;
- private SimpleCondition listenGate;
- private static final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
+ private final SimpleCondition listenGate;
+ private final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
- static
{
for (StorageService.Verb verb : StorageService.Verb.values())
droppedMessages.put(verb, new AtomicInteger());
@@ -105,13 +104,7 @@ public class MessagingService implements
return MSHandle.instance;
}
- public Object clone() throws CloneNotSupportedException
- {
- //Prevents the singleton from being cloned
- throw new CloneNotSupportedException();
- }
-
- protected MessagingService()
+ private MessagingService()
{
listenGate = new SimpleCondition();
verbHandlers_ = new EnumMap<StorageService.Verb, IVerbHandler>(StorageService.Verb.class);
@@ -155,7 +148,7 @@ public class MessagingService implements
}
}
- public byte[] hash(String type, byte data[])
+ public static byte[] hash(String type, byte data[])
{
byte result[];
try
@@ -204,7 +197,7 @@ public class MessagingService implements
}
}
- public static OutboundTcpConnectionPool getConnectionPool(InetAddress to)
+ public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
{
OutboundTcpConnectionPool cp = connectionManagers_.get(to);
if (cp == null)
@@ -215,7 +208,7 @@ public class MessagingService implements
return cp;
}
- public static OutboundTcpConnection getConnection(InetAddress to, Message msg)
+ public OutboundTcpConnection getConnection(InetAddress to, Message msg)
{
return getConnectionPool(to).getConnection(msg);
}
@@ -275,7 +268,7 @@ public class MessagingService implements
addresses.add(endpoint);
}
- private static void removeTarget(String messageId, InetAddress from)
+ private void removeTarget(String messageId, InetAddress from)
{
Collection<InetAddress> addresses = targets.get(messageId);
// null is expected if we removed the callback or we got a reply after its timeout expired
@@ -346,7 +339,7 @@ public class MessagingService implements
// do local deliveries
if ( message.getFrom().equals(to) )
{
- MessagingService.receive(message);
+ receive(message);
return;
}
@@ -407,19 +400,19 @@ public class MessagingService implements
}
/** blocks until the processing pools are empty and done. */
- public static void waitFor() throws InterruptedException
+ public void waitFor() throws InterruptedException
{
while (!streamExecutor_.isTerminated())
streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
}
- public static void shutdown()
+ public void shutdown()
{
logger_.info("Shutting down MessageService...");
try
{
- instance().socketThread.close();
+ socketThread.close();
}
catch (IOException e)
{
@@ -432,7 +425,7 @@ public class MessagingService implements
logger_.info("Shutdown complete (no further commands will be processed)");
}
- public static void receive(Message message)
+ public void receive(Message message)
{
message = SinkManager.processServerMessage(message);
if (message == null)
@@ -444,23 +437,23 @@ public class MessagingService implements
stage.execute(runnable);
}
- public static IMessageCallback getRegisteredCallback(String messageId)
+ public IMessageCallback getRegisteredCallback(String messageId)
{
return callbacks.get(messageId);
}
- public static IMessageCallback removeRegisteredCallback(String messageId)
+ public IMessageCallback removeRegisteredCallback(String messageId)
{
targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
return callbacks.remove(messageId);
}
- public static long getRegisteredCallbackAge(String messageId)
+ public long getRegisteredCallbackAge(String messageId)
{
return callbacks.getAge(messageId);
}
- public static void responseReceivedFrom(String messageId, InetAddress from)
+ public void responseReceivedFrom(String messageId, InetAddress from)
{
removeTarget(messageId, from);
}
@@ -476,7 +469,7 @@ public class MessagingService implements
return x >>> (p + 1) - n & ~(-1 << n);
}
- public static ByteBuffer packIt(byte[] bytes, boolean compress)
+ public ByteBuffer packIt(byte[] bytes, boolean compress)
{
/*
Setting up the protocol header. This is 4 bytes long
@@ -506,7 +499,7 @@ public class MessagingService implements
return buffer;
}
- public static ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
+ public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
{
/*
Setting up the protocol header. This is 4 bytes long
@@ -557,12 +550,12 @@ public class MessagingService implements
return buffer;
}
- public static int incrementDroppedMessages(StorageService.Verb verb)
+ public int incrementDroppedMessages(StorageService.Verb verb)
{
return droppedMessages.get(verb).incrementAndGet();
}
- private static void logDroppedMessages()
+ private void logDroppedMessages()
{
boolean logTpstats = false;
for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Tue Jan 11 23:36:37 2011
@@ -37,9 +37,9 @@ public class ResponseVerbHandler impleme
public void doVerb(Message message)
{
String messageId = message.getMessageId();
- MessagingService.responseReceivedFrom(messageId, message.getFrom());
- double age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
- IMessageCallback cb = MessagingService.getRegisteredCallback(messageId);
+ MessagingService.instance().responseReceivedFrom(messageId, message.getFrom());
+ double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(messageId);
+ IMessageCallback cb = MessagingService.instance().getRegisteredCallback(messageId);
if (cb == null)
return;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java Tue Jan 11 23:36:37 2011
@@ -83,9 +83,10 @@ public class ReadCallback<T> implements
public void close()
{
+ MessagingService ms = MessagingService.instance();
for (Message response : resolver.getMessages())
{
- MessagingService.removeRegisteredCallback(response.getMessageId());
+ ms.removeRegisteredCallback(response.getMessageId());
}
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Tue Jan 11 23:36:37 2011
@@ -273,7 +273,7 @@ public class StorageService implements I
Gossiper.instance.unregister(migrationManager);
Gossiper.instance.unregister(this);
Gossiper.instance.stop();
- MessagingService.shutdown();
+ MessagingService.instance().shutdown();
StageManager.shutdownNow();
}
@@ -1500,7 +1500,7 @@ public class StorageService implements I
public void run()
{
Gossiper.instance.stop();
- MessagingService.shutdown();
+ MessagingService.instance().shutdown();
StageManager.shutdownNow();
setMode("Decommissioned", true);
// let op be responsible for killing the process
@@ -1797,9 +1797,9 @@ public class StorageService implements I
setMode("Starting drain process", true);
Gossiper.instance.stop();
setMode("Draining: shutting down MessageService", false);
- MessagingService.shutdown();
+ MessagingService.instance().shutdown();
setMode("Draining: emptying MessageService pools", false);
- MessagingService.waitFor();
+ MessagingService.instance().waitFor();
setMode("Draining: clearing mutation stage", false);
mutationStage.shutdown();
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java Tue Jan 11 23:36:37 2011
@@ -84,7 +84,7 @@ public class FileStreamTask extends Wrap
private void stream(SocketChannel channel) throws IOException
{
- ByteBuffer buffer = MessagingService.constructStreamHeader(header, false);
+ ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false);
channel.write(buffer);
assert buffer.remaining() == 0;
if (header.file == null)
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java Tue Jan 11 23:36:37 2011
@@ -84,7 +84,7 @@ public class RemoveTest extends CleanupH
public void tearDown()
{
SinkManager.clear();
- MessagingService.shutdown();
+ MessagingService.instance().shutdown();
ss.setPartitionerUnsafe(oldPartitioner);
}