You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:30:20 UTC

svn commit: r904933 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/streaming/

Author: jbellis
Date: Sun Jan 31 00:30:19 2010
New Revision: 904933

URL: http://svn.apache.org/viewvc?rev=904933&view=rev
Log:
mv InitiatedFile -> PendingFile
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java Sun Jan 31 00:30:19 2010
@@ -28,5 +28,5 @@
      * This callback if registered with the StreamContextManager is 
      * called when the stream from a host is completely handled. 
     */
-    public void onStreamCompletion(InetAddress from, InitiatedFile initiatedFile, CompletedFileStatus streamStatus) throws IOException;
+    public void onStreamCompletion(InetAddress from, PendingFile pendingFile, CompletedFileStatus streamStatus) throws IOException;
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Sun Jan 31 00:30:19 2010
@@ -31,7 +31,7 @@
 public class IncomingStreamReader
 {
     private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
-    private InitiatedFile initiatedFile;
+    private PendingFile pendingFile;
     private CompletedFileStatus streamStatus;
     private SocketChannel socketChannel;
 
@@ -39,8 +39,8 @@
     {
         this.socketChannel = socketChannel;
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        initiatedFile = StreamInManager.getStreamContext(remoteAddress.getAddress());
-        assert initiatedFile != null;
+        pendingFile = StreamInManager.getStreamContext(remoteAddress.getAddress());
+        assert pendingFile != null;
         streamStatus = StreamInManager.getStreamStatus(remoteAddress.getAddress());
         assert streamStatus != null;
     }
@@ -49,14 +49,14 @@
     {
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
         if (logger.isDebugEnabled())
-          logger.debug("Creating file for " + initiatedFile.getTargetFile());
-        FileOutputStream fos = new FileOutputStream(initiatedFile.getTargetFile(), true);
+          logger.debug("Creating file for " + pendingFile.getTargetFile());
+        FileOutputStream fos = new FileOutputStream(pendingFile.getTargetFile(), true);
         FileChannel fc = fos.getChannel();
 
         long bytesRead = 0;
         try
         {
-            while (bytesRead < initiatedFile.getExpectedBytes())
+            while (bytesRead < pendingFile.getExpectedBytes())
                 bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
         }
         catch (IOException ex)
@@ -65,16 +65,16 @@
             streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
             handleStreamCompletion(remoteAddress.getAddress());
             /* Delete the orphaned file. */
-            File file = new File(initiatedFile.getTargetFile());
+            File file = new File(pendingFile.getTargetFile());
             file.delete();
             throw ex;
         }
 
-        if (bytesRead == initiatedFile.getExpectedBytes())
+        if (bytesRead == pendingFile.getExpectedBytes())
         {
             if (logger.isDebugEnabled())
             {
-                logger.debug("Removing stream context " + initiatedFile);
+                logger.debug("Removing stream context " + pendingFile);
             }
             fc.close();
             handleStreamCompletion(remoteAddress.getAddress());
@@ -89,6 +89,6 @@
         */
         IStreamComplete streamComplete = StreamInManager.getStreamCompletionHandler(remoteHost);
         if (streamComplete != null)
-            streamComplete.onStreamCompletion(remoteHost, initiatedFile, streamStatus);
+            streamComplete.onStreamCompletion(remoteHost, pendingFile, streamStatus);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Sun Jan 31 00:30:19 2010
@@ -24,27 +24,27 @@
 {
     private static Logger logger = Logger.getLogger(StreamCompletionHandler.class);
 
-    public void onStreamCompletion(InetAddress host, InitiatedFile initiatedFile, CompletedFileStatus streamStatus) throws IOException
+    public void onStreamCompletion(InetAddress host, PendingFile pendingFile, CompletedFileStatus streamStatus) throws IOException
     {
         /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
-        if (initiatedFile.getTargetFile().contains("-Data.db"))
+        if (pendingFile.getTargetFile().contains("-Data.db"))
         {
-            String tableName = initiatedFile.getTable();
-            File file = new File( initiatedFile.getTargetFile() );
+            String tableName = pendingFile.getTable();
+            File file = new File( pendingFile.getTargetFile() );
             String fileName = file.getName();
             String [] temp = fileName.split("-");
 
             //Open the file to see if all parts are now here
             try
             {
-                SSTableReader sstable = SSTableWriter.renameAndOpen(initiatedFile.getTargetFile());
+                SSTableReader sstable = SSTableWriter.renameAndOpen(pendingFile.getTargetFile());
                 //TODO add a sanity check that this sstable has all its parts and is ok
                 Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
                 logger.info("Streaming added " + sstable.getFilename());
             }
             catch (IOException e)
             {
-                throw new RuntimeException("Not able to add streamed file " + initiatedFile.getTargetFile(), e);
+                throw new RuntimeException("Not able to add streamed file " + pendingFile.getTargetFile(), e);
             }
         }
 
@@ -56,7 +56,7 @@
         /* If we're done with everything for this host, remove from bootstrap sources */
         if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())
         {
-            StorageService.instance.removeBootstrapSource(host, initiatedFile.getTable());
+            StorageService.instance.removeBootstrapSource(host, pendingFile.getTable());
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:30:19 2010
@@ -18,13 +18,9 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.util.*;
 import java.net.InetAddress;
 
-import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.streaming.IStreamComplete;
 
 import org.apache.log4j.Logger;
@@ -34,21 +30,21 @@
     private static final Logger logger = Logger.getLogger(StreamInManager.class);
 
     /* Maintain a stream context per host that is the source of the stream */
-    public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress, List<InitiatedFile>>();
+    public static final Map<InetAddress, List<PendingFile>> ctxBag_ = new Hashtable<InetAddress, List<PendingFile>>();
     /* Maintain in this map the status of the streams that need to be sent back to the source */
     public static final Map<InetAddress, List<CompletedFileStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<CompletedFileStatus>>();
     /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
     public static final Map<InetAddress, IStreamComplete> streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
     
-    public synchronized static InitiatedFile getStreamContext(InetAddress key)
+    public synchronized static PendingFile getStreamContext(InetAddress key)
     {        
-        List<InitiatedFile> context = ctxBag_.get(key);
+        List<PendingFile> context = ctxBag_.get(key);
         if ( context == null )
             throw new IllegalStateException("Streaming context has not been set for " + key);
-        InitiatedFile initiatedFile = context.remove(0);
+        PendingFile pendingFile = context.remove(0);
         if ( context.isEmpty() )
             ctxBag_.remove(key);
-        return initiatedFile;
+        return pendingFile;
     }
     
     public synchronized static CompletedFileStatus getStreamStatus(InetAddress key)
@@ -86,16 +82,16 @@
         streamNotificationHandlers_.put(key, streamComplete);
     }
     
-    public synchronized static void addStreamContext(InetAddress key, InitiatedFile initiatedFile, CompletedFileStatus streamStatus)
+    public synchronized static void addStreamContext(InetAddress key, PendingFile pendingFile, CompletedFileStatus streamStatus)
     {
         /* Record the stream context */
-        List<InitiatedFile> context = ctxBag_.get(key);
+        List<PendingFile> context = ctxBag_.get(key);
         if ( context == null )
         {
-            context = new ArrayList<InitiatedFile>();
+            context = new ArrayList<PendingFile>();
             ctxBag_.put(key, context);
         }
-        context.add(initiatedFile);
+        context.add(pendingFile);
         
         /* Record the stream status for this stream context */
         List<CompletedFileStatus> status = streamStatusBag_.get(key);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java Sun Jan 31 00:30:19 2010
@@ -25,7 +25,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.InitiatedFile;
+import org.apache.cassandra.streaming.PendingFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -51,14 +51,14 @@
         return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE, bos.toByteArray() );
     }
     
-    protected InitiatedFile[] streamContexts_ = new InitiatedFile[0];
+    protected PendingFile[] streamContexts_ = new PendingFile[0];
    
-    public StreamInitiateMessage(InitiatedFile[] initiatedFiles)
+    public StreamInitiateMessage(PendingFile[] pendingFiles)
     {
-        streamContexts_ = initiatedFiles;
+        streamContexts_ = pendingFiles;
     }
     
-    public InitiatedFile[] getStreamContext()
+    public PendingFile[] getStreamContext()
     {
         return streamContexts_;
     }
@@ -68,25 +68,25 @@
         public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
         {
             dos.writeInt(bim.streamContexts_.length);
-            for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+            for ( PendingFile pendingFile : bim.streamContexts_ )
             {
-                InitiatedFile.serializer().serialize(initiatedFile, dos);
+                PendingFile.serializer().serialize(pendingFile, dos);
             }
         }
 
         public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
         {
             int size = dis.readInt();
-            InitiatedFile[] initiatedFiles = new InitiatedFile[0];
+            PendingFile[] pendingFiles = new PendingFile[0];
             if ( size > 0 )
             {
-                initiatedFiles = new InitiatedFile[size];
+                pendingFiles = new PendingFile[size];
                 for ( int i = 0; i < size; ++i )
                 {
-                    initiatedFiles[i] = InitiatedFile.serializer().deserialize(dis);
+                    pendingFiles[i] = PendingFile.serializer().deserialize(dis);
                 }
             }
-            return new StreamInitiateMessage(initiatedFiles);
+            return new StreamInitiateMessage(pendingFiles);
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:30:19 2010
@@ -41,9 +41,9 @@
         try
         {
             StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn));
-            InitiatedFile[] initiatedFiles = biMsg.getStreamContext();
+            PendingFile[] pendingFiles = biMsg.getStreamContext();
 
-            if (initiatedFiles.length == 0)
+            if (pendingFiles.length == 0)
             {
                 if (logger.isDebugEnabled())
                     logger.debug("no data needed from " + message.getFrom());
@@ -52,7 +52,7 @@
                 return;
             }
 
-            Map<String, String> fileNames = getNewNames(initiatedFiles);
+            Map<String, String> fileNames = getNewNames(pendingFiles);
             Map<String, String> pathNames = new HashMap<String, String>();
             for (String ssName : fileNames.keySet())
                 pathNames.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation());
@@ -61,15 +61,15 @@
              * generate the new file names and store the new file names
              * in the StreamContextManager.
             */
-            for (InitiatedFile initiatedFile : initiatedFiles)
+            for (PendingFile pendingFile : pendingFiles)
             {
-                CompletedFileStatus streamStatus = new CompletedFileStatus(initiatedFile.getTargetFile(), initiatedFile.getExpectedBytes() );
-                String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, initiatedFile);
+                CompletedFileStatus streamStatus = new CompletedFileStatus(pendingFile.getTargetFile(), pendingFile.getExpectedBytes() );
+                String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, pendingFile);
 
                 if (logger.isDebugEnabled())
-                  logger.debug("Received Data from  : " + message.getFrom() + " " + initiatedFile.getTargetFile() + " " + file);
-                initiatedFile.setTargetFile(file);
-                addStreamContext(message.getFrom(), initiatedFile, streamStatus);
+                  logger.debug("Received Data from  : " + message.getFrom() + " " + pendingFile.getTargetFile() + " " + file);
+                pendingFile.setTargetFile(file);
+                addStreamContext(message.getFrom(), pendingFile, streamStatus);
             }
 
             StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
@@ -86,23 +86,23 @@
 
     public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
                                                        Map<String, String> pathNames,
-                                                       InitiatedFile initiatedFile)
+                                                       PendingFile pendingFile)
     {
-        File sourceFile = new File( initiatedFile.getTargetFile() );
+        File sourceFile = new File( pendingFile.getTargetFile() );
         String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
         String cfName = piece[0];
         String ssTableNum = piece[1];
         String typeOfFile = piece[2];
 
-        String newFileNameExpanded = fileNames.get(initiatedFile.getTable() + "-" + cfName + "-" + ssTableNum);
-        String path = pathNames.get(initiatedFile.getTable() + "-" + cfName + "-" + ssTableNum);
+        String newFileNameExpanded = fileNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum);
+        String path = pathNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum);
         //Drop type (Data.db) from new FileName
         String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
-        return path + File.separator + initiatedFile.getTable() + File.separator + newFileName;
+        return path + File.separator + pendingFile.getTable() + File.separator + newFileName;
     }
 
     // todo: this method needs to be private, or package at the very least for easy unit testing.
-    public Map<String, String> getNewNames(InitiatedFile[] initiatedFiles) throws IOException
+    public Map<String, String> getNewNames(PendingFile[] pendingFiles) throws IOException
     {
         /*
          * Mapping for each file with unique CF-i ---> new file name. For eg.
@@ -113,10 +113,10 @@
         Map<String, String> fileNames = new HashMap<String, String>();
         /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
         Set<String> distinctEntries = new HashSet<String>();
-        for ( InitiatedFile initiatedFile : initiatedFiles)
+        for ( PendingFile pendingFile : pendingFiles)
         {
-            String[] pieces = FBUtilities.strip(new File(initiatedFile.getTargetFile()).getName(), "-");
-            distinctEntries.add(initiatedFile.getTable() + "-" + pieces[0] + "-" + pieces[1] );
+            String[] pieces = FBUtilities.strip(new File(pendingFile.getTargetFile()).getName(), "-");
+            distinctEntries.add(pendingFile.getTable() + "-" + pieces[0] + "-" + pieces[1] );
         }
 
         /* Generate unique file names per entry */
@@ -136,10 +136,10 @@
         return fileNames;
     }
 
-    private void addStreamContext(InetAddress host, InitiatedFile initiatedFile, CompletedFileStatus streamStatus)
+    private void addStreamContext(InetAddress host, PendingFile pendingFile, CompletedFileStatus streamStatus)
     {
         if (logger.isDebugEnabled())
-          logger.debug("Adding stream context " + initiatedFile + " for " + host + " ...");
-        StreamInManager.addStreamContext(host, initiatedFile, streamStatus);
+          logger.debug("Adding stream context " + pendingFile + " for " + host + " ...");
+        StreamInManager.addStreamContext(host, pendingFile, streamStatus);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jan 31 00:30:19 2010
@@ -119,28 +119,28 @@
      */
     public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
     {
-        InitiatedFile[] initiatedFiles = new InitiatedFile[SSTable.FILES_ON_DISK * sstables.size()];
+        PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK * sstables.size()];
         int i = 0;
         for (SSTableReader sstable : sstables)
         {
             for (String filename : sstable.getAllFilenames())
             {
                 File file = new File(filename);
-                initiatedFiles[i++] = new InitiatedFile(file.getAbsolutePath(), file.length(), table);
+                pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), file.length(), table);
             }
         }
         if (logger.isDebugEnabled())
-          logger.debug("Stream context metadata " + StringUtils.join(initiatedFiles, ", "));
+          logger.debug("Stream context metadata " + StringUtils.join(pendingFiles, ", "));
 
-        StreamOutManager.get(target).addFilesToStream(initiatedFiles);
-        StreamInitiateMessage biMessage = new StreamInitiateMessage(initiatedFiles);
+        StreamOutManager.get(target).addFilesToStream(pendingFiles);
+        StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
         message.addHeader(StreamOut.TABLE_NAME, table.getBytes());
         if (logger.isDebugEnabled())
           logger.debug("Sending a stream initiate message to " + target + " ...");
         MessagingService.instance.sendOneWay(message, target);
 
-        if (initiatedFiles.length > 0)
+        if (pendingFiles.length > 0)
         {
             logger.info("Waiting for transfer to " + target + " to complete");
             StreamOutManager.get(target).waitForStreamCompletion();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Sun Jan 31 00:30:19 2010
@@ -65,14 +65,14 @@
         this.to = to;
     }
     
-    public void addFilesToStream(InitiatedFile[] initiatedFiles)
+    public void addFilesToStream(PendingFile[] pendingFiles)
     {
-        for (InitiatedFile initiatedFile : initiatedFiles)
+        for (PendingFile pendingFile : pendingFiles)
         {
             if (logger.isDebugEnabled())
-              logger.debug("Adding file " + initiatedFile.getTargetFile() + " to be streamed.");
-            files.add( new File( initiatedFile.getTargetFile() ) );
-            totalBytes += initiatedFile.getExpectedBytes();
+              logger.debug("Adding file " + pendingFile.getTargetFile() + " to be streamed.");
+            files.add( new File( pendingFile.getTargetFile() ) );
+            totalBytes += pendingFile.getExpectedBytes();
         }
     }
     

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=904933&r1=904932&r2=904933&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Sun Jan 31 00:30:19 2010
@@ -35,12 +35,12 @@
     @Test
     public void testGetNewNames() throws IOException
     {
-        InitiatedFile[] initiatedFiles = new InitiatedFile[3];
-        initiatedFiles[0] = new InitiatedFile("/baz/foo/Standard1-500-Data.db", 100, "Keyspace1");
-        initiatedFiles[1] = new InitiatedFile("/bar/foo/Standard1-500-Index.db", 100, "Keyspace1");
-        initiatedFiles[2] = new InitiatedFile("/bad/foo/Standard1-500-Filter.db", 100, "Keyspace1");
+        PendingFile[] pendingFiles = new PendingFile[3];
+        pendingFiles[0] = new PendingFile("/baz/foo/Standard1-500-Data.db", 100, "Keyspace1");
+        pendingFiles[1] = new PendingFile("/bar/foo/Standard1-500-Index.db", 100, "Keyspace1");
+        pendingFiles[2] = new PendingFile("/bad/foo/Standard1-500-Filter.db", 100, "Keyspace1");
         StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
-        Map<String, String> fileNames = bivh.getNewNames(initiatedFiles);
+        Map<String, String> fileNames = bivh.getNewNames(pendingFiles);
         Map<String, String> paths = new HashMap<String, String>();
         for (String ssName : fileNames.keySet())
             paths.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation());
@@ -50,8 +50,8 @@
         assertEquals(true, result.contains("Data.db"));
         assertEquals(1, fileNames.entrySet().size());
 
-        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
-        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
-        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, pendingFiles[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, pendingFiles[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
+        assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, pendingFiles[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
     }
 }