You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:23:19 UTC

svn commit: r904927 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/net/io/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/dht/

Author: jbellis
Date: Sun Jan 31 00:23:19 2010
New Revision: 904927

URL: http://svn.apache.org/viewvc?rev=904927&view=rev
Log:
rename StreamContextManager -> StreamInManager, StreamManger -> StreamOutManager, extract SCM.StreamContext -> InitiatedFile
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java   (with props)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java   (contents, props changed)
      - copied, changed from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java   (contents, props changed)
      - copied, changed from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java Sun Jan 31 00:23:19 2010
@@ -25,7 +25,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.InitiatedFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -51,14 +51,14 @@
         return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE, bos.toByteArray() );
     }
     
-    protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
+    protected InitiatedFile[] streamContexts_ = new InitiatedFile[0];
    
-    public StreamInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+    public StreamInitiateMessage(InitiatedFile[] initiatedFiles)
     {
-        streamContexts_ = streamContexts;
+        streamContexts_ = initiatedFiles;
     }
     
-    public StreamContextManager.StreamContext[] getStreamContext()
+    public InitiatedFile[] getStreamContext()
     {
         return streamContexts_;
     }
@@ -69,25 +69,25 @@
     public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
     {
         dos.writeInt(bim.streamContexts_.length);
-        for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
+        for ( InitiatedFile initiatedFile : bim.streamContexts_ )
         {
-            StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
+            InitiatedFile.serializer().serialize(initiatedFile, dos);
         }
     }
     
     public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
     {
         int size = dis.readInt();
-        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
+        InitiatedFile[] initiatedFiles = new InitiatedFile[0];
         if ( size > 0 )
         {
-            streamContexts = new StreamContextManager.StreamContext[size];
+            initiatedFiles = new InitiatedFile[size];
             for ( int i = 0; i < size; ++i )
             {
-                streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
+                initiatedFiles[i] = InitiatedFile.serializer().deserialize(dis);
             }
         }
-        return new StreamInitiateMessage(streamContexts);
+        return new StreamInitiateMessage(initiatedFiles);
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java Sun Jan 31 00:23:19 2010
@@ -28,22 +28,23 @@
 
 import org.apache.cassandra.net.FileStreamTask;
 import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.InitiatedFile;
+import org.apache.cassandra.streaming.StreamInManager;
 
 public class IncomingStreamReader
 {
     private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
-    private StreamContextManager.StreamContext streamContext;
-    private StreamContextManager.StreamStatus streamStatus;
+    private InitiatedFile initiatedFile;
+    private StreamInManager.StreamStatus streamStatus;
     private SocketChannel socketChannel;
 
     public IncomingStreamReader(SocketChannel socketChannel)
     {
         this.socketChannel = socketChannel;
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        streamContext = StreamContextManager.getStreamContext(remoteAddress.getAddress());
-        assert streamContext != null;
-        streamStatus = StreamContextManager.getStreamStatus(remoteAddress.getAddress());
+        initiatedFile = StreamInManager.getStreamContext(remoteAddress.getAddress());
+        assert initiatedFile != null;
+        streamStatus = StreamInManager.getStreamStatus(remoteAddress.getAddress());
         assert streamStatus != null;
     }
 
@@ -51,32 +52,32 @@
     {
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
         if (logger.isDebugEnabled())
-          logger.debug("Creating file for " + streamContext.getTargetFile());
-        FileOutputStream fos = new FileOutputStream(streamContext.getTargetFile(), true);
+          logger.debug("Creating file for " + initiatedFile.getTargetFile());
+        FileOutputStream fos = new FileOutputStream(initiatedFile.getTargetFile(), true);
         FileChannel fc = fos.getChannel();
 
         long bytesRead = 0;
         try
         {
-            while (bytesRead < streamContext.getExpectedBytes())
+            while (bytesRead < initiatedFile.getExpectedBytes())
                 bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
         }
         catch (IOException ex)
         {
             /* Ask the source node to re-stream this file. */
-            streamStatus.setAction(StreamContextManager.StreamCompletionAction.STREAM);
+            streamStatus.setAction(StreamInManager.StreamCompletionAction.STREAM);
             handleStreamCompletion(remoteAddress.getAddress());
             /* Delete the orphaned file. */
-            File file = new File(streamContext.getTargetFile());
+            File file = new File(initiatedFile.getTargetFile());
             file.delete();
             throw ex;
         }
 
-        if (bytesRead == streamContext.getExpectedBytes())
+        if (bytesRead == initiatedFile.getExpectedBytes())
         {
             if (logger.isDebugEnabled())
             {
-                logger.debug("Removing stream context " + streamContext);
+                logger.debug("Removing stream context " + initiatedFile);
             }
             fc.close();
             handleStreamCompletion(remoteAddress.getAddress());
@@ -89,8 +90,8 @@
          * Streaming is complete. If all the data that has to be received inform the sender via
          * the stream completion callback so that the source may perform the requisite cleanup.
         */
-        IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+        IStreamComplete streamComplete = StreamInManager.getStreamCompletionHandler(remoteHost);
         if (streamComplete != null)
-            streamComplete.onStreamCompletion(remoteHost, streamContext, streamStatus);
+            streamComplete.onStreamCompletion(remoteHost, initiatedFile, streamStatus);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java Sun Jan 31 00:23:19 2010
@@ -22,7 +22,7 @@
 
 import java.net.InetAddress;
 
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInManager;
 
 public interface IStreamComplete
 {
@@ -30,5 +30,5 @@
      * This callback if registered with the StreamContextManager is 
      * called when the stream from a host is completely handled. 
     */
-    public void onStreamCompletion(InetAddress from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+    public void onStreamCompletion(InetAddress from, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException;
 }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java?rev=904927&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java Sun Jan 31 00:23:19 2010
@@ -0,0 +1,90 @@
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+public class InitiatedFile
+{
+    private static ICompactSerializer<InitiatedFile> serializer_;
+
+    static
+    {
+        serializer_ = new InitiatedFileSerializer();
+    }
+
+    public static ICompactSerializer<InitiatedFile> serializer()
+    {
+        return serializer_;
+    }
+
+    private String targetFile_;
+    private long expectedBytes_;
+    private String table_;
+
+    public InitiatedFile(String targetFile, long expectedBytes, String table)
+    {
+        targetFile_ = targetFile;
+        expectedBytes_ = expectedBytes;
+        table_ = table;
+    }
+
+    public String getTable()
+    {
+        return table_;
+    }
+
+    public String getTargetFile()
+    {
+        return targetFile_;
+    }
+
+    public void setTargetFile(String file)
+    {
+        targetFile_ = file;
+    }
+
+    public long getExpectedBytes()
+    {
+        return expectedBytes_;
+    }
+
+    public boolean equals(Object o)
+    {
+        if ( !(o instanceof InitiatedFile) )
+            return false;
+
+        InitiatedFile rhs = (InitiatedFile)o;
+        return targetFile_.equals(rhs.targetFile_);
+    }
+
+    public int hashCode()
+    {
+        return toString().hashCode();
+    }
+
+    public String toString()
+    {
+        return targetFile_ + ":" + expectedBytes_;
+    }
+
+    private static class InitiatedFileSerializer implements ICompactSerializer<InitiatedFile>
+    {
+        public void serialize(InitiatedFile sc, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(sc.targetFile_);
+            dos.writeLong(sc.expectedBytes_);
+            dos.writeUTF(sc.table_);
+        }
+
+        public InitiatedFile deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            String table = dis.readUTF();
+            return new InitiatedFile(targetFile, expectedBytes, table);
+        }
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Sun Jan 31 00:23:19 2010
@@ -12,7 +12,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInManager;
 import org.apache.cassandra.service.StorageService;
 
 /**
@@ -25,41 +25,41 @@
 {
     private static Logger logger = Logger.getLogger(StreamCompletionHandler.class);
 
-    public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+    public void onStreamCompletion(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException
     {
         /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
-        if (streamContext.getTargetFile().contains("-Data.db"))
+        if (initiatedFile.getTargetFile().contains("-Data.db"))
         {
-            String tableName = streamContext.getTable();
-            File file = new File( streamContext.getTargetFile() );
+            String tableName = initiatedFile.getTable();
+            File file = new File( initiatedFile.getTargetFile() );
             String fileName = file.getName();
             String [] temp = fileName.split("-");
 
             //Open the file to see if all parts are now here
             try
             {
-                SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+                SSTableReader sstable = SSTableWriter.renameAndOpen(initiatedFile.getTargetFile());
                 //TODO add a sanity check that this sstable has all its parts and is ok
                 Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
                 logger.info("Streaming added " + sstable.getFilename());
             }
             catch (IOException e)
             {
-                throw new RuntimeException("Not able to add streamed file " + streamContext.getTargetFile(), e);
+                throw new RuntimeException("Not able to add streamed file " + initiatedFile.getTargetFile(), e);
             }
         }
 
         if (logger.isDebugEnabled())
           logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
         /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
-        StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
-        Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+        StreamInManager.StreamStatusMessage streamStatusMessage = new StreamInManager.StreamStatusMessage(streamStatus);
+        Message message = StreamInManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
         MessagingService.instance.sendOneWay(message, host);
 
         /* If we're done with everything for this host, remove from bootstrap sources */
-        if (StreamContextManager.isDone(host) && StorageService.instance.isBootstrapMode())
+        if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())
         {
-            StorageService.instance.removeBootstrapSource(host, streamContext.getTable());
+            StorageService.instance.removeBootstrapSource(host, initiatedFile.getTable());
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Sun Jan 31 00:23:19 2010
@@ -9,8 +9,8 @@
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamContextManager;
-import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamInManager;
+import org.apache.cassandra.streaming.StreamOutManager;
 
 public class StreamFinishedVerbHandler implements IVerbHandler
 {
@@ -23,19 +23,19 @@
 
         try
         {
-            StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(new DataInputStream(bufIn));
-            StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+            StreamInManager.StreamStatusMessage streamStatusMessage = StreamInManager.StreamStatusMessage.serializer().deserialize(new DataInputStream(bufIn));
+            StreamInManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
 
             switch (streamStatus.getAction())
             {
                 case DELETE:
-                    StreamManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
+                    StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
                     break;
 
                 case STREAM:
                     if (logger.isDebugEnabled())
                         logger.debug("Need to re-stream file " + streamStatus.getFile());
-                    StreamManager.get(message.getFrom()).startNext();
+                    StreamOutManager.get(message.getFrom()).startNext();
                     break;
 
                 default:

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java&r1=904925&r2=904927&rev=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:23:19 2010
@@ -33,9 +33,9 @@
 
 import org.apache.log4j.Logger;
 
-public class StreamContextManager
+public class StreamInManager
 {
-    private static Logger logger = Logger.getLogger(StreamContextManager.class);
+    private static final Logger logger = Logger.getLogger(StreamInManager.class);
     
     public static enum StreamCompletionAction
     {
@@ -43,89 +43,6 @@
         STREAM
     }
     
-    public static class StreamContext
-    {
-        private static ICompactSerializer<StreamContext> serializer_;
-        
-        static
-        {
-            serializer_ = new StreamContextSerializer();
-        }
-        
-        public static ICompactSerializer<StreamContext> serializer()
-        {
-            return serializer_;
-        }
-                
-        private String targetFile_;        
-        private long expectedBytes_;                     
-        private String table_;
-        
-        public StreamContext(String targetFile, long expectedBytes, String table)
-        {
-            targetFile_ = targetFile;
-            expectedBytes_ = expectedBytes;         
-            table_ = table;
-        }
-
-        public String getTable()
-        {
-            return table_;
-        }                
-                
-        public String getTargetFile()
-        {
-            return targetFile_;
-        }
-        
-        public void setTargetFile(String file)
-        {
-            targetFile_ = file;
-        }
-        
-        public long getExpectedBytes()
-        {
-            return expectedBytes_;
-        }
-                
-        public boolean equals(Object o)
-        {
-            if ( !(o instanceof StreamContext) )
-                return false;
-            
-            StreamContext rhs = (StreamContext)o;
-            return targetFile_.equals(rhs.targetFile_);
-        }
-        
-        public int hashCode()
-        {
-            return toString().hashCode();
-        }
-        
-        public String toString()
-        {
-            return targetFile_ + ":" + expectedBytes_;
-        }
-    }
-    
-    public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
-    {
-        public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
-        {
-            dos.writeUTF(sc.targetFile_);
-            dos.writeLong(sc.expectedBytes_);            
-            dos.writeUTF(sc.table_);
-        }
-        
-        public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
-        {
-            String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();           
-            String table = dis.readUTF();
-            return new StreamContext(targetFile, expectedBytes, table);
-        }
-    }
-    
     public static class StreamStatus
     {
         private static ICompactSerializer<StreamStatus> serializer_;
@@ -148,7 +65,7 @@
         {
             file_ = file;
             expectedBytes_ = expectedBytes;
-            action_ = StreamContextManager.StreamCompletionAction.DELETE;
+            action_ = StreamInManager.StreamCompletionAction.DELETE;
         }
         
         public String getFile()
@@ -161,12 +78,12 @@
             return expectedBytes_;
         }
         
-        public void setAction(StreamContextManager.StreamCompletionAction action)
+        public void setAction(StreamInManager.StreamCompletionAction action)
         {
             action_ = action;
         }
         
-        public StreamContextManager.StreamCompletionAction getAction()
+        public StreamInManager.StreamCompletionAction getAction()
         {
             return action_;
         }
@@ -223,14 +140,14 @@
             return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
         }
         
-        protected StreamContextManager.StreamStatus streamStatus_;
+        protected StreamInManager.StreamStatus streamStatus_;
         
-        public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
+        public StreamStatusMessage(StreamInManager.StreamStatus streamStatus)
         {
             streamStatus_ = streamStatus;
         }
         
-        public StreamContextManager.StreamStatus getStreamStatus()
+        public StreamInManager.StreamStatus getStreamStatus()
         {
             return streamStatus_;
         }
@@ -245,27 +162,27 @@
         
         public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
         {            
-            StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);         
+            StreamInManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
             return new StreamStatusMessage(streamStatus);
         }
     }
         
     /* Maintain a stream context per host that is the source of the stream */
-    public static final Map<InetAddress, List<StreamContext>> ctxBag_ = new Hashtable<InetAddress, List<StreamContext>>();
+    public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress, List<InitiatedFile>>();
     /* Maintain in this map the status of the streams that need to be sent back to the source */
     public static final Map<InetAddress, List<StreamStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<StreamStatus>>();
     /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
     public static final Map<InetAddress, IStreamComplete> streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
     
-    public synchronized static StreamContext getStreamContext(InetAddress key)
+    public synchronized static InitiatedFile getStreamContext(InetAddress key)
     {        
-        List<StreamContext> context = ctxBag_.get(key);
+        List<InitiatedFile> context = ctxBag_.get(key);
         if ( context == null )
             throw new IllegalStateException("Streaming context has not been set for " + key);
-        StreamContext streamContext = context.remove(0);        
+        InitiatedFile initiatedFile = context.remove(0);
         if ( context.isEmpty() )
             ctxBag_.remove(key);
-        return streamContext;
+        return initiatedFile;
     }
     
     public synchronized static StreamStatus getStreamStatus(InetAddress key)
@@ -303,16 +220,16 @@
         streamNotificationHandlers_.put(key, streamComplete);
     }
     
-    public synchronized static void addStreamContext(InetAddress key, StreamContext streamContext, StreamStatus streamStatus)
+    public synchronized static void addStreamContext(InetAddress key, InitiatedFile initiatedFile, StreamStatus streamStatus)
     {
         /* Record the stream context */
-        List<StreamContext> context = ctxBag_.get(key);        
+        List<InitiatedFile> context = ctxBag_.get(key);
         if ( context == null )
         {
-            context = new ArrayList<StreamContext>();
+            context = new ArrayList<InitiatedFile>();
             ctxBag_.put(key, context);
         }
-        context.add(streamContext);
+        context.add(initiatedFile);
         
         /* Record the stream status for this stream context */
         List<StreamStatus> status = streamStatusBag_.get(key);

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java Sun Jan 31 00:23:19 2010
@@ -4,7 +4,7 @@
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamOutManager;
 
 public class StreamInitiateDoneVerbHandler implements IVerbHandler
 {
@@ -14,6 +14,6 @@
     {
         if (logger.isDebugEnabled())
           logger.debug("Received a stream initiate done message ...");
-        StreamManager.get(message.getFrom()).startNext();
+        StreamOutManager.get(message.getFrom()).startNext();
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:23:19 2010
@@ -16,7 +16,7 @@
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -41,9 +41,9 @@
         try
         {
             StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn));
-            StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+            InitiatedFile[] initiatedFiles = biMsg.getStreamContext();
 
-            if (streamContexts.length == 0)
+            if (initiatedFiles.length == 0)
             {
                 if (logger.isDebugEnabled())
                     logger.debug("no data needed from " + message.getFrom());
@@ -52,7 +52,7 @@
                 return;
             }
 
-            Map<String, String> fileNames = getNewNames(streamContexts);
+            Map<String, String> fileNames = getNewNames(initiatedFiles);
             Map<String, String> pathNames = new HashMap<String, String>();
             for (String ssName : fileNames.keySet())
                 pathNames.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation());
@@ -61,18 +61,18 @@
              * generate the new file names and store the new file names
              * in the StreamContextManager.
             */
-            for (StreamContextManager.StreamContext streamContext : streamContexts )
+            for (InitiatedFile initiatedFile : initiatedFiles)
             {
-                StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
-                String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, streamContext);
+                StreamInManager.StreamStatus streamStatus = new StreamInManager.StreamStatus(initiatedFile.getTargetFile(), initiatedFile.getExpectedBytes() );
+                String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, initiatedFile);
 
                 if (logger.isDebugEnabled())
-                  logger.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
-                streamContext.setTargetFile(file);
-                addStreamContext(message.getFrom(), streamContext, streamStatus);
+                  logger.debug("Received Data from  : " + message.getFrom() + " " + initiatedFile.getTargetFile() + " " + file);
+                initiatedFile.setTargetFile(file);
+                addStreamContext(message.getFrom(), initiatedFile, streamStatus);
             }
 
-            StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
+            StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
             if (logger.isDebugEnabled())
               logger.debug("Sending a stream initiate done message ...");
             Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] );
@@ -86,23 +86,23 @@
 
     public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
                                                        Map<String, String> pathNames,
-                                                       StreamContextManager.StreamContext streamContext)
+                                                       InitiatedFile initiatedFile)
     {
-        File sourceFile = new File( streamContext.getTargetFile() );
+        File sourceFile = new File( initiatedFile.getTargetFile() );
         String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
         String cfName = piece[0];
         String ssTableNum = piece[1];
         String typeOfFile = piece[2];
 
-        String newFileNameExpanded = fileNames.get(streamContext.getTable() + "-" + cfName + "-" + ssTableNum);
-        String path = pathNames.get(streamContext.getTable() + "-" + cfName + "-" + ssTableNum);
+        String newFileNameExpanded = fileNames.get(initiatedFile.getTable() + "-" + cfName + "-" + ssTableNum);
+        String path = pathNames.get(initiatedFile.getTable() + "-" + cfName + "-" + ssTableNum);
         //Drop type (Data.db) from new FileName
         String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
-        return path + File.separator + streamContext.getTable() + File.separator + newFileName;
+        return path + File.separator + initiatedFile.getTable() + File.separator + newFileName;
     }
 
     // todo: this method needs to be private, or package at the very least for easy unit testing.
-    public Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
+    public Map<String, String> getNewNames(InitiatedFile[] initiatedFiles) throws IOException
     {
         /*
          * Mapping for each file with unique CF-i ---> new file name. For eg.
@@ -113,10 +113,10 @@
         Map<String, String> fileNames = new HashMap<String, String>();
         /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
         Set<String> distinctEntries = new HashSet<String>();
-        for ( StreamContextManager.StreamContext streamContext : streamContexts )
+        for ( InitiatedFile initiatedFile : initiatedFiles)
         {
-            String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
-            distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
+            String[] pieces = FBUtilities.strip(new File(initiatedFile.getTargetFile()).getName(), "-");
+            distinctEntries.add(initiatedFile.getTable() + "-" + pieces[0] + "-" + pieces[1] );
         }
 
         /* Generate unique file names per entry */
@@ -136,10 +136,10 @@
         return fileNames;
     }
 
-    private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+    private void addStreamContext(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus)
     {
         if (logger.isDebugEnabled())
-          logger.debug("Adding stream context " + streamContext + " for " + host + " ...");
-        StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+          logger.debug("Adding stream context " + initiatedFile + " for " + host + " ...");
+        StreamInManager.addStreamContext(host, initiatedFile, streamStatus);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jan 31 00:23:19 2010
@@ -38,10 +38,9 @@
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.streaming.StreamContextManager;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamOutManager;
 
 /**
  * This class handles streaming data from one node to another.
@@ -121,31 +120,31 @@
      */
     public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
     {
-        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
+        InitiatedFile[] initiatedFiles = new InitiatedFile[SSTable.FILES_ON_DISK * sstables.size()];
         int i = 0;
         for (SSTableReader sstable : sstables)
         {
             for (String filename : sstable.getAllFilenames())
             {
                 File file = new File(filename);
-                streamContexts[i++] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), table);
+                initiatedFiles[i++] = new InitiatedFile(file.getAbsolutePath(), file.length(), table);
             }
         }
         if (logger.isDebugEnabled())
-          logger.debug("Stream context metadata " + StringUtils.join(streamContexts, ", "));
+          logger.debug("Stream context metadata " + StringUtils.join(initiatedFiles, ", "));
 
-        StreamManager.get(target).addFilesToStream(streamContexts);
-        StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts);
+        StreamOutManager.get(target).addFilesToStream(initiatedFiles);
+        StreamInitiateMessage biMessage = new StreamInitiateMessage(initiatedFiles);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
         message.addHeader(StreamOut.TABLE_NAME, table.getBytes());
         if (logger.isDebugEnabled())
           logger.debug("Sending a stream initiate message to " + target + " ...");
         MessagingService.instance.sendOneWay(message, target);
 
-        if (streamContexts.length > 0)
+        if (initiatedFiles.length > 0)
         {
             logger.info("Waiting for transfer to " + target + " to complete");
-            StreamManager.get(target).waitForStreamCompletion();
+            StreamOutManager.get(target).waitForStreamCompletion();
             // (StreamManager will delete the streamed file on completion.)
             logger.info("Done with transfer to " + target);
         }

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java&r1=904925&r2=904927&rev=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Sun Jan 31 00:23:19 2010
@@ -28,7 +28,6 @@
 import java.net.InetAddress;
 
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamContextManager;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.SimpleCondition;
@@ -38,22 +37,22 @@
 /**
  * This class manages the streaming of multiple files one after the other.
 */
-public class StreamManager
+public class StreamOutManager
 {   
-    private static Logger logger = Logger.getLogger( StreamManager.class );
+    private static Logger logger = Logger.getLogger( StreamOutManager.class );
         
-    private static ConcurrentMap<InetAddress, StreamManager> streamManagers = new ConcurrentHashMap<InetAddress, StreamManager>();
+    private static ConcurrentMap<InetAddress, StreamOutManager> streamManagers = new ConcurrentHashMap<InetAddress, StreamOutManager>();
 
-    public static StreamManager get(InetAddress to)
+    public static StreamOutManager get(InetAddress to)
     {
-        StreamManager streamManager = streamManagers.get(to);
-        if (streamManager == null)
+        StreamOutManager manager = streamManagers.get(to);
+        if (manager == null)
         {
-            StreamManager possibleNew = new StreamManager(to);
-            if ((streamManager = streamManagers.putIfAbsent(to, possibleNew)) == null)
-                streamManager = possibleNew;
+            StreamOutManager possibleNew = new StreamOutManager(to);
+            if ((manager = streamManagers.putIfAbsent(to, possibleNew)) == null)
+                manager = possibleNew;
         }
-        return streamManager;
+        return manager;
     }
     
     private final List<File> files = new ArrayList<File>();
@@ -61,19 +60,19 @@
     private long totalBytes = 0L;
     private final SimpleCondition condition = new SimpleCondition();
     
-    private StreamManager(InetAddress to)
+    private StreamOutManager(InetAddress to)
     {
         this.to = to;
     }
     
-    public void addFilesToStream(StreamContextManager.StreamContext[] streamContexts)
+    public void addFilesToStream(InitiatedFile[] initiatedFiles)
     {
-        for (StreamContextManager.StreamContext streamContext : streamContexts)
+        for (InitiatedFile initiatedFile : initiatedFiles)
         {
             if (logger.isDebugEnabled())
-              logger.debug("Adding file " + streamContext.getTargetFile() + " to be streamed.");
-            files.add( new File( streamContext.getTargetFile() ) );
-            totalBytes += streamContext.getExpectedBytes();
+              logger.debug("Adding file " + initiatedFile.getTargetFile() + " to be streamed.");
+            files.add( new File( initiatedFile.getTargetFile() ) );
+            totalBytes += initiatedFile.getExpectedBytes();
         }
     }
     

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Sun Jan 31 00:23:19 2010
@@ -27,7 +27,7 @@
 import java.util.Map;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.InitiatedFile;
 import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
 
 import org.junit.Test;
@@ -37,12 +37,12 @@
     @Test
     public void testGetNewNames() throws IOException
     {
-        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[3];
-        streamContexts[0] = new StreamContextManager.StreamContext("/baz/foo/Standard1-500-Data.db", 100, "Keyspace1");
-        streamContexts[1] = new StreamContextManager.StreamContext("/bar/foo/Standard1-500-Index.db", 100, "Keyspace1");
-        streamContexts[2] = new StreamContextManager.StreamContext("/bad/foo/Standard1-500-Filter.db", 100, "Keyspace1");
+        InitiatedFile[] initiatedFiles = new InitiatedFile[3];
+        initiatedFiles[0] = new InitiatedFile("/baz/foo/Standard1-500-Data.db", 100, "Keyspace1");
+        initiatedFiles[1] = new InitiatedFile("/bar/foo/Standard1-500-Index.db", 100, "Keyspace1");
+        initiatedFiles[2] = new InitiatedFile("/bad/foo/Standard1-500-Filter.db", 100, "Keyspace1");
         StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
-        Map<String, String> fileNames = bivh.getNewNames(streamContexts);
+        Map<String, String> fileNames = bivh.getNewNames(initiatedFiles);
         Map<String, String> paths = new HashMap<String, String>();
         for (String ssName : fileNames.keySet())
             paths.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation());
@@ -52,8 +52,8 @@
         assertEquals(true, result.contains("Data.db"));
         assertEquals(1, fileNames.entrySet().size());
 
-        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, streamContexts[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
-        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
-        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
     }
 }