You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/06/24 16:26:46 UTC

svn commit: r1139326 - in /cassandra/branches/cassandra-0.8.1: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/stream...

Author: slebresne
Date: Fri Jun 24 14:26:45 2011
New Revision: 1139326

URL: http://svn.apache.org/viewvc?rev=1139326&view=rev
Log:
Reverting CASSANDRA-2280 for 0.8.1 release

Modified:
    cassandra/branches/cassandra-0.8.1/CHANGES.txt
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/db/Table.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
    cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/branches/cassandra-0.8.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/CHANGES.txt?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8.1/CHANGES.txt Fri Jun 24 14:26:45 2011
@@ -35,14 +35,12 @@
  * throttle migration replay (CASSANDRA-2714)
  * optimize column serializer creation (CASSANDRA-2716)
  * Added support for making bootstrap retry if nodes flap (CASSANDRA-2644)
- * Added statusthrift to nodetool to report if thrift server is running
-   (CASSANDRA-2722)
+ * Added statusthrift to nodetool to report if thrift server is running (CASSANDRA-2722)
  * Fixed rows being cached if they do not exist (CASSANDRA-2723)
  * fix truncate/compaction race (CASSANDRA-2673)
  * Support passing tableName and cfName to RowCacheProviders (CASSANDRA-2702)
  * workaround large resultsets causing large allocation retention
    by nio sockets (CASSANDRA-2654)
- * restrict repair streaming to specific columnfamilies (CASSANDRA-2280)
  * fix nodetool ring use with Ec2Snitch (CASSANDRA-2733)
  * fix inconsistency window during bootstrap (CASSANDRA-833)
  * fix removing columns and subcolumns that are supressed by a row or

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/db/Table.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/db/Table.java Fri Jun 24 14:26:45 2011
@@ -253,7 +253,7 @@ public class Table
     }
     
     /**
-     * @return A list of open SSTableReaders
+     * @return A list of open SSTableReaders (TODO: ensure that the caller doesn't modify these).
      */
     public List<SSTableReader> getAllSSTables()
     {

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java Fri Jun 24 14:26:45 2011
@@ -58,9 +58,7 @@ import org.cliffc.high_scale_lib.NonBloc
 public final class MessagingService implements MessagingServiceMBean
 {
     public static final int VERSION_07 = 1;
-    public static final int VERSION_080 = 2;
-    public static final int version_ = 81;
-
+    public static final int version_ = 2;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
     private SerializerType serializerType_ = SerializerType.BINARY;
 

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Jun 24 14:26:45 2011
@@ -494,7 +494,7 @@ public class AntiEntropyService
                 StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback);
                 StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES);
                 // request ranges from the remote node
-                StreamIn.requestRanges(request.endpoint, request.cf.left, Collections.singletonList(cfstore), differences, callback, OperationType.AES);
+                StreamIn.requestRanges(request.endpoint, request.cf.left, differences, callback, OperationType.AES);
             }
             catch(Exception e)
             {

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/StorageService.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/service/StorageService.java Fri Jun 24 14:26:45 2011
@@ -2326,7 +2326,7 @@ public class StorageService implements I
                     public void run()
                     {
                         // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
-                        StreamOut.transferRanges(newEndpoint, Table.open(table), Arrays.asList(range), callback, OperationType.UNBOOTSTRAP);
+                        StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback, OperationType.UNBOOTSTRAP);
                     }
                 });
             }

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamIn.java Fri Jun 24 14:26:45 2011
@@ -24,11 +24,7 @@ package org.apache.cassandra.streaming;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -51,29 +47,22 @@ public class StreamIn
 {
     private static Logger logger = LoggerFactory.getLogger(StreamIn.class);
 
-    /** Request ranges for all column families in the given keyspace. */
-    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
-    {
-        requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type);
-    }
-
     /**
-     * Request ranges to be transferred from specific CFs
+     * Request ranges to be transferred from source to local node
      */
-    public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range> ranges, Runnable callback, OperationType type)
+    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
     {
         assert ranges.size() > 0;
 
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
         StreamInSession session = StreamInSession.create(source, callback);
-        StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getLocalAddress(),
-                                                            ranges,
-                                                            tableName,
-                                                            columnFamilies,
-                                                            session.getSessionId(),
-                                                            type);
-        Message message = srm.getMessage(Gossiper.instance.getVersion(source));
+        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), 
+                                                   ranges, 
+                                                   tableName, 
+                                                   session.getSessionId(), 
+                                                   type)
+                .getMessage(Gossiper.instance.getVersion(source));
         MessagingService.instance().sendOneWay(message, source);
     }
 
@@ -89,5 +78,5 @@ public class StreamIn
         Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size, remote.desc.version));
 
         return new PendingFile(localdesc, remote);
-    }
+     }
 }

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamOut.java Fri Jun 24 14:26:45 2011
@@ -22,21 +22,21 @@ package org.apache.cassandra.streaming;
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import com.google.common.collect.Iterables;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -65,48 +65,74 @@ public class StreamOut
     private static Logger logger = LoggerFactory.getLogger(StreamOut.class);
 
     /**
-     * Stream the given ranges to the target endpoint from each CF in the given keyspace.
+     * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
     */
-    public static void transferRanges(InetAddress target, Table table, Collection<Range> ranges, Runnable callback, OperationType type)
+    public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
     {
-        StreamOutSession session = StreamOutSession.create(table.name, target, callback);
-        transferRanges(session, table.getColumnFamilyStores(), ranges, type);
+        assert ranges.size() > 0;
+        
+        // this is so that this target shows up as a destination while anticompaction is happening.
+        StreamOutSession session = StreamOutSession.create(tableName, target, callback);
+
+        logger.info("Beginning transfer to {}", target);
+        logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
+
+        try
+        {
+            Table table = flushSSTable(tableName);
+            // send the matching portion of every sstable in the keyspace
+            transferSSTables(session, table.getAllSSTables(), ranges, type);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
     }
 
     /**
-     * Flushes matching column families from the given keyspace, or all columnFamilies
-     * if the cf list is empty.
+     * (1) dump all the memtables to disk.
+     * (2) determine the minimal file sections we need to send for the given ranges
+     * (3) transfer the data.
      */
-    private static void flushSSTables(Iterable<ColumnFamilyStore> stores) throws IOException
+    private static Table flushSSTable(String tableName) throws IOException
     {
-        logger.info("Flushing memtables for {}...", stores);
-        List<Future<?>> flushes;
-        flushes = new ArrayList<Future<?>>();
-        for (ColumnFamilyStore cfstore : stores)
-        {
-            Future<?> flush = cfstore.forceFlush();
-            if (flush != null)
-                flushes.add(flush);
+        Table table = Table.open(tableName);
+        logger.info("Flushing memtables for {}...", tableName);
+        for (Future<?> f : table.flush())
+        {
+            try
+            {
+                f.get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-        FBUtilities.waitOnFutures(flushes);
+        return table;
     }
 
     /**
-     * Stream the given ranges to the target endpoint from each of the given CFs.
+     * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
     */
-    public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range> ranges, OperationType type)
+    public static void transferRangesForRequest(StreamOutSession session, Collection<Range> ranges, OperationType type)
     {
         assert ranges.size() > 0;
 
         logger.info("Beginning transfer to {}", session.getHost());
         logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
+
         try
         {
-            flushSSTables(cfses);
-            Iterable<SSTableReader> sstables = Collections.emptyList();
-            for (ColumnFamilyStore cfStore : cfses)
-                sstables = Iterables.concat(sstables, cfStore.getSSTables());
-            transferSSTables(session, sstables, ranges, type);
+            Table table = flushSSTable(session.table);
+            // send the matching portion of every sstable in the keyspace
+            List<PendingFile> pending = createPendingFiles(table.getAllSSTables(), ranges, type);
+            session.addFilesToStream(pending);
+            session.begin();
         }
         catch (IOException e)
         {
@@ -115,10 +141,9 @@ public class StreamOut
     }
 
     /**
-     * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
-     * You should probably call transferRanges instead.
+     * Transfers matching portions of a group of sstables from a single table to the target endpoint.
      */
-    public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException
+    public static void transferSSTables(StreamOutSession session, Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException
     {
         List<PendingFile> pending = createPendingFiles(sstables, ranges, type);
 
@@ -129,7 +154,7 @@ public class StreamOut
     }
 
     // called prior to sending anything.
-    private static List<PendingFile> createPendingFiles(Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type)
+    private static List<PendingFile> createPendingFiles(Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type)
     {
         List<PendingFile> pending = new ArrayList<PendingFile>();
         for (SSTableReader sstable : sstables)
@@ -140,7 +165,7 @@ public class StreamOut
                 continue;
             pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
         }
-        logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));
+        logger.info("Stream context metadata {}, {} sstables.", pending, sstables.size());
         return pending;
     }
 }

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Fri Jun 24 14:26:45 2011
@@ -23,13 +23,10 @@ package org.apache.cassandra.streaming;
 
 import java.io.*;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.ICompactSerializer;
@@ -69,15 +66,13 @@ class StreamRequestMessage implements Me
     // if these are specified, file shoud not be.
     protected final Collection<Range> ranges;
     protected final String table;
-    protected final Iterable<ColumnFamilyStore> columnFamilies;
     protected final OperationType type;
 
-    StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type)
+    StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, long sessionId, OperationType type)
     {
         this.target = target;
         this.ranges = ranges;
         this.table = table;
-        this.columnFamilies = columnFamilies;
         this.sessionId = sessionId;
         this.type = type;
         file = null;
@@ -91,7 +86,6 @@ class StreamRequestMessage implements Me
         this.type = file.type;
         ranges = null;
         table = null;
-        columnFamilies = null;
     }
     
     public Message getMessage(Integer version)
@@ -116,8 +110,6 @@ class StreamRequestMessage implements Me
         {
             sb.append(table);
             sb.append("@");
-            sb.append(columnFamilies.toString());
-            sb.append("@");
             sb.append(target);
             sb.append("------->");
             for ( Range range : ranges )
@@ -154,16 +146,8 @@ class StreamRequestMessage implements Me
                 {
                     AbstractBounds.serializer().serialize(range, dos);
                 }
-
                 if (version > MessagingService.VERSION_07)
                     dos.writeUTF(srm.type.name());
-
-                if (version > MessagingService.VERSION_080)
-                {
-                    dos.writeInt(Iterables.size(srm.columnFamilies));
-                    for (ColumnFamilyStore cfs : srm.columnFamilies)
-                        dos.writeInt(cfs.metadata.cfId);
-                }
             }
         }
 
@@ -189,16 +173,7 @@ class StreamRequestMessage implements Me
                 OperationType type = OperationType.RESTORE_REPLICA_COUNT;
                 if (version > MessagingService.VERSION_07)
                     type = OperationType.valueOf(dis.readUTF());
-
-                List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
-                if (version > MessagingService.VERSION_080)
-                {
-                    int cfsSize = dis.readInt();
-                    for (int i = 0; i < cfsSize; ++i)
-                        stores.add(Table.open(table).getColumnFamilyStore(dis.readInt()));
-                }
-
-                return new StreamRequestMessage(target, ranges, table, stores, sessionId, type);
+                return new StreamRequestMessage(target, ranges, table, sessionId, type);
             }
         }
     }

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Fri Jun 24 14:26:45 2011
@@ -51,7 +51,7 @@ public class StreamRequestVerbHandler im
                 logger.debug(srm.toString());
 
             StreamOutSession session = StreamOutSession.create(srm.table, message.getFrom(), srm.sessionId);
-            StreamOut.transferRanges(session, srm.columnFamilies, srm.ranges, srm.type);
+            StreamOut.transferRangesForRequest(session, srm.ranges, srm.type);
         }
         catch (IOException ex)
         {

Modified: cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original)
+++ cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Fri Jun 24 14:26:45 2011
@@ -22,7 +22,6 @@ package org.apache.cassandra.streaming;
 
 
 import org.apache.cassandra.AbstractSerializationsTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -42,7 +41,9 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
@@ -145,8 +146,7 @@ public class SerializationsTest extends 
         Collection<Range> ranges = new ArrayList<Range>();
         for (int i = 0; i < 5; i++)
             ranges.add(new Range(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
-        List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
-        StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT);
+        StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L, OperationType.RESTORE_REPLICA_COUNT);
         StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L);
         StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L);
 

Modified: cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1139326&r1=1139325&r2=1139326&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/branches/cassandra-0.8.1/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Jun 24 14:26:45 2011
@@ -23,10 +23,12 @@ import static junit.framework.Assert.ass
 import static org.apache.cassandra.Util.column;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;