You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:23:42 UTC

svn commit: r904929 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming: StreamCompletionHandler.java StreamFinishedVerbHandler.java StreamInManager.java

Author: jbellis
Date: Sun Jan 31 00:23:42 2010
New Revision: 904929

URL: http://svn.apache.org/viewvc?rev=904929&view=rev
Log:
r/m unneeded StreamStatusMessage wrapper
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Sun Jan 31 00:23:42 2010
@@ -51,10 +51,8 @@
 
         if (logger.isDebugEnabled())
           logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
-        /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
-        StreamInManager.StreamStatusMessage streamStatusMessage = new StreamInManager.StreamStatusMessage(streamStatus);
-        Message message = StreamInManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
-        MessagingService.instance.sendOneWay(message, host);
+        /* Send a StreamStatus message which may require the source node to re-stream certain files. */
+        MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), host);
 
         /* If we're done with everything for this host, remove from bootstrap sources */
         if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Sun Jan 31 00:23:42 2010
@@ -23,8 +23,7 @@
 
         try
         {
-            StreamInManager.StreamStatusMessage streamStatusMessage = StreamInManager.StreamStatusMessage.serializer().deserialize(new DataInputStream(bufIn));
-            StreamInManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+            StreamInManager.StreamStatus streamStatus = StreamInManager.StreamStatus.serializer().deserialize(new DataInputStream(bufIn));
 
             switch (streamStatus.getAction())
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:23:42 2010
@@ -87,6 +87,14 @@
         {
             return action_;
         }
+
+        public Message makeStreamStatusMessage() throws IOException
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream( bos );
+            StreamStatus.serializer().serialize(this, dos);
+            return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+        }
     }
     
     public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
@@ -117,56 +125,7 @@
             return streamStatus;
         }
     }
-    
-    public static class StreamStatusMessage
-    {
-        private static ICompactSerializer<StreamStatusMessage> serializer_;
-        
-        static 
-        {
-            serializer_ = new StreamStatusMessageSerializer();
-        }
-        
-        public static ICompactSerializer<StreamStatusMessage> serializer()
-        {
-            return serializer_;
-        }
-        
-        public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
-        {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream( bos );
-            StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
-            return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
-        }
-        
-        protected StreamInManager.StreamStatus streamStatus_;
-        
-        public StreamStatusMessage(StreamInManager.StreamStatus streamStatus)
-        {
-            streamStatus_ = streamStatus;
-        }
-        
-        public StreamInManager.StreamStatus getStreamStatus()
-        {
-            return streamStatus_;
-        }
-    }
-    
-    public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
-    {
-        public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
-        {
-            StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);            
-        }
-        
-        public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
-        {            
-            StreamInManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
-            return new StreamStatusMessage(streamStatus);
-        }
-    }
-        
+                
     /* Maintain a stream context per host that is the source of the stream */
     public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress, List<InitiatedFile>>();
     /* Maintain in this map the status of the streams that need to be sent back to the source */