You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:24:12 UTC

svn commit: r904930 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: net/io/ streaming/

Author: jbellis
Date: Sun Jan 31 00:24:12 2010
New Revision: 904930

URL: http://svn.apache.org/viewvc?rev=904930&view=rev
Log:
extract SIM.StreamStatus to CompletedFileStatus
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java   (with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java Sun Jan 31 00:24:12 2010
@@ -27,6 +27,7 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.net.FileStreamTask;
+import org.apache.cassandra.streaming.CompletedFileStatus;
 import org.apache.cassandra.streaming.IStreamComplete;
 import org.apache.cassandra.streaming.InitiatedFile;
 import org.apache.cassandra.streaming.StreamInManager;
@@ -35,7 +36,7 @@
 {
     private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
     private InitiatedFile initiatedFile;
-    private StreamInManager.StreamStatus streamStatus;
+    private CompletedFileStatus streamStatus;
     private SocketChannel socketChannel;
 
     public IncomingStreamReader(SocketChannel socketChannel)
@@ -65,7 +66,7 @@
         catch (IOException ex)
         {
             /* Ask the source node to re-stream this file. */
-            streamStatus.setAction(StreamInManager.StreamCompletionAction.STREAM);
+            streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
             handleStreamCompletion(remoteAddress.getAddress());
             /* Delete the orphaned file. */
             File file = new File(initiatedFile.getTargetFile());

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=904930&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java Sun Jan 31 00:24:12 2010
@@ -0,0 +1,100 @@
+package org.apache.cassandra.streaming;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CompletedFileStatus
+{
+    private static ICompactSerializer<CompletedFileStatus> serializer_;
+
+    public static enum StreamCompletionAction
+    {
+        DELETE,
+        STREAM
+    }
+
+    static
+    {
+        serializer_ = new CompletedFileStatusSerializer();
+    }
+
+    public static ICompactSerializer<CompletedFileStatus> serializer()
+    {
+        return serializer_;
+    }
+
+    private String file_;
+    private long expectedBytes_;
+    private StreamCompletionAction action_;
+
+    public CompletedFileStatus(String file, long expectedBytes)
+    {
+        file_ = file;
+        expectedBytes_ = expectedBytes;
+        action_ = StreamCompletionAction.DELETE;
+    }
+
+    public String getFile()
+    {
+        return file_;
+    }
+
+    public long getExpectedBytes()
+    {
+        return expectedBytes_;
+    }
+
+    public void setAction(StreamCompletionAction action)
+    {
+        action_ = action;
+    }
+
+    public StreamCompletionAction getAction()
+    {
+        return action_;
+    }
+
+    public Message makeStreamStatusMessage() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        CompletedFileStatus.serializer().serialize(this, dos);
+        return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+    }
+
+    private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
+    {
+        public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(streamStatus.getFile());
+            dos.writeLong(streamStatus.getExpectedBytes());
+            dos.writeInt(streamStatus.getAction().ordinal());
+        }
+
+        public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
+
+            int ordinal = dis.readInt();
+            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.DELETE);
+            }
+            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.STREAM);
+            }
+
+            return streamStatus;
+        }
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java Sun Jan 31 00:24:12 2010
@@ -22,13 +22,11 @@
 
 import java.net.InetAddress;
 
-import org.apache.cassandra.streaming.StreamInManager;
-
 public interface IStreamComplete
 {
     /*
      * This callback if registered with the StreamContextManager is 
      * called when the stream from a host is completely handled. 
     */
-    public void onStreamCompletion(InetAddress from, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException;
+    public void onStreamCompletion(InetAddress from, InitiatedFile initiatedFile, CompletedFileStatus streamStatus) throws IOException;
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Sun Jan 31 00:24:12 2010
@@ -9,7 +9,6 @@
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.SSTableWriter;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.IStreamComplete;
 import org.apache.cassandra.streaming.StreamInManager;
@@ -25,7 +24,7 @@
 {
     private static Logger logger = Logger.getLogger(StreamCompletionHandler.class);
 
-    public void onStreamCompletion(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException
+    public void onStreamCompletion(InetAddress host, InitiatedFile initiatedFile, CompletedFileStatus streamStatus) throws IOException
     {
         /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
         if (initiatedFile.getTargetFile().contains("-Data.db"))

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Sun Jan 31 00:24:12 2010
@@ -9,7 +9,6 @@
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamInManager;
 import org.apache.cassandra.streaming.StreamOutManager;
 
 public class StreamFinishedVerbHandler implements IVerbHandler
@@ -23,7 +22,7 @@
 
         try
         {
-            StreamInManager.StreamStatus streamStatus = StreamInManager.StreamStatus.serializer().deserialize(new DataInputStream(bufIn));
+            CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
 
             switch (streamStatus.getAction())
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:24:12 2010
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -26,110 +25,18 @@
 import java.net.InetAddress;
 
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
 
 import org.apache.log4j.Logger;
 
 public class StreamInManager
 {
     private static final Logger logger = Logger.getLogger(StreamInManager.class);
-    
-    public static enum StreamCompletionAction
-    {
-        DELETE,
-        STREAM
-    }
-    
-    public static class StreamStatus
-    {
-        private static ICompactSerializer<StreamStatus> serializer_;
-        
-        static 
-        {
-            serializer_ = new StreamStatusSerializer();
-        }
-        
-        public static ICompactSerializer<StreamStatus> serializer()
-        {
-            return serializer_;
-        }
-            
-        private String file_;               
-        private long expectedBytes_;                
-        private StreamCompletionAction action_;
-                
-        public StreamStatus(String file, long expectedBytes)
-        {
-            file_ = file;
-            expectedBytes_ = expectedBytes;
-            action_ = StreamInManager.StreamCompletionAction.DELETE;
-        }
-        
-        public String getFile()
-        {
-            return file_;
-        }
-        
-        public long getExpectedBytes()
-        {
-            return expectedBytes_;
-        }
-        
-        public void setAction(StreamInManager.StreamCompletionAction action)
-        {
-            action_ = action;
-        }
-        
-        public StreamInManager.StreamCompletionAction getAction()
-        {
-            return action_;
-        }
 
-        public Message makeStreamStatusMessage() throws IOException
-        {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream( bos );
-            StreamStatus.serializer().serialize(this, dos);
-            return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
-        }
-    }
-    
-    public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
-    {
-        public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
-        {
-            dos.writeUTF(streamStatus.getFile());
-            dos.writeLong(streamStatus.getExpectedBytes());
-            dos.writeInt(streamStatus.getAction().ordinal());
-        }
-        
-        public StreamStatus deserialize(DataInputStream dis) throws IOException
-        {
-            String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();
-            StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
-            
-            int ordinal = dis.readInt();                        
-            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.DELETE);
-            }
-            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.STREAM);
-            }
-            
-            return streamStatus;
-        }
-    }
-                
     /* Maintain a stream context per host that is the source of the stream */
     public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress, List<InitiatedFile>>();
     /* Maintain in this map the status of the streams that need to be sent back to the source */
-    public static final Map<InetAddress, List<StreamStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<StreamStatus>>();
+    public static final Map<InetAddress, List<CompletedFileStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<CompletedFileStatus>>();
     /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
     public static final Map<InetAddress, IStreamComplete> streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
     
@@ -144,12 +51,12 @@
         return initiatedFile;
     }
     
-    public synchronized static StreamStatus getStreamStatus(InetAddress key)
+    public synchronized static CompletedFileStatus getStreamStatus(InetAddress key)
     {
-        List<StreamStatus> status = streamStatusBag_.get(key);
+        List<CompletedFileStatus> status = streamStatusBag_.get(key);
         if ( status == null )
             throw new IllegalStateException("Streaming status has not been set for " + key);
-        StreamStatus streamStatus = status.remove(0);        
+        CompletedFileStatus streamStatus = status.remove(0);
         if ( status.isEmpty() )
             streamStatusBag_.remove(key);
         return streamStatus;
@@ -179,7 +86,7 @@
         streamNotificationHandlers_.put(key, streamComplete);
     }
     
-    public synchronized static void addStreamContext(InetAddress key, InitiatedFile initiatedFile, StreamStatus streamStatus)
+    public synchronized static void addStreamContext(InetAddress key, InitiatedFile initiatedFile, CompletedFileStatus streamStatus)
     {
         /* Record the stream context */
         List<InitiatedFile> context = ctxBag_.get(key);
@@ -191,10 +98,10 @@
         context.add(initiatedFile);
         
         /* Record the stream status for this stream context */
-        List<StreamStatus> status = streamStatusBag_.get(key);
+        List<CompletedFileStatus> status = streamStatusBag_.get(key);
         if ( status == null )
         {
-            status = new ArrayList<StreamStatus>();
+            status = new ArrayList<CompletedFileStatus>();
             streamStatusBag_.put(key, status);
         }
         status.add( streamStatus );

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:24:12 2010
@@ -63,7 +63,7 @@
             */
             for (InitiatedFile initiatedFile : initiatedFiles)
             {
-                StreamInManager.StreamStatus streamStatus = new StreamInManager.StreamStatus(initiatedFile.getTargetFile(), initiatedFile.getExpectedBytes() );
+                CompletedFileStatus streamStatus = new CompletedFileStatus(initiatedFile.getTargetFile(), initiatedFile.getExpectedBytes() );
                 String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, initiatedFile);
 
                 if (logger.isDebugEnabled())
@@ -136,7 +136,7 @@
         return fileNames;
     }
 
-    private void addStreamContext(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus)
+    private void addStreamContext(InetAddress host, InitiatedFile initiatedFile, CompletedFileStatus streamStatus)
     {
         if (logger.isDebugEnabled())
           logger.debug("Adding stream context " + initiatedFile + " for " + host + " ...");