You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/07 23:51:06 UTC

svn commit: r888170 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/net/io/ test/unit/org/apache/cassandra/io/

Author: jbellis
Date: Mon Dec  7 22:51:05 2009
New Revision: 888170

URL: http://svn.apache.org/viewvc?rev=888170&view=rev
Log:
cleanup Streaming and rename transferOneTable -> transferSSTables.  patch by Stu Hood; reviewed by jbellis for CASSANDRA-520

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java   (with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Mon Dec  7 22:51:05 2009
@@ -51,7 +51,7 @@
     public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
 
     /**
-     * split out files on disk locally for each range and then stream them to the target endpoint
+     * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
     */
     public static void transferRanges(InetAddress target, Collection<Range> ranges, Runnable callback)
     {
@@ -77,7 +77,7 @@
                 if (logger.isDebugEnabled())
                   logger.debug("Performing anticompaction ...");
                 /* Get the list of files that need to be streamed */
-                transferOneTable(target, table.forceAntiCompaction(ranges, target), tName); // SSTR GC deletes the file when done
+                transferSSTables(target, table.forceAntiCompaction(ranges, target), tName); // SSTR GC deletes the file when done
             }
             catch (IOException e)
             {
@@ -88,7 +88,11 @@
             callback.run();
     }
 
-    private static void transferOneTable(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
+    /**
+     * Transfers a group of sstables from a single table to the target endpoint
+     * and then marks them as ready for local deletion.
+     */
+    public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
     {
         StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
         int i = 0;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Mon Dec  7 22:51:05 2009
@@ -53,7 +53,6 @@
             connection = new TcpConnection(from_, to_);
             File file = new File(file_);             
             connection.stream(file, startPosition_, total_);
-            MessagingService.setStreamingMode(false);
             if (logger_.isDebugEnabled())
               logger_.debug("Done streaming " + file);
         }            

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Dec  7 22:51:05 2009
@@ -57,8 +57,6 @@
     };
     
     private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
-    /* Indicate if we are currently streaming data to another node or receiving streaming data */
-    private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
     
     /* This records all the results mapped by message Id */
     private static ICachetable<String, IAsyncCallback> callbackMap_;
@@ -485,17 +483,11 @@
 
     public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
     {
-        isStreaming_.set(true);
         /* Streaming asynchronously on streamExector_ threads. */
         Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
         streamExecutor_.execute(streamingTask);
     }
 
-    public static void setStreamingMode(boolean bVal)
-    {
-        isStreaming_.set(bVal);
-    }
-
     public static void shutdown()
     {
         logger_.info("Shutting down ...");

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Mon Dec  7 22:51:05 2009
@@ -459,7 +459,6 @@
                     }
                     else
                     {
-                        MessagingService.setStreamingMode(false);
                         /* Close this socket connection  used for streaming */
                         closeSocket();
                     }                    

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java Mon Dec  7 22:51:05 2009
@@ -50,9 +50,6 @@
         int stream = MessagingService.getBits(pH, 3, 1);
         stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
         
-        if ( stream_.getProtocolHeader().isStreamingMode_ )
-            MessagingService.setStreamingMode(true);
-        
         int version = MessagingService.getBits(pH, 15, 8);
         stream_.getProtocolHeader().version_ = version;
         

Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=888170&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java Mon Dec  7 22:51:05 2009
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RangeReply;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTableUtils;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.junit.Test;
+
+public class StreamingTest extends CleanupHelper
+{
+    public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+    @Test
+    public void testTransferTable() throws Exception
+    {
+        StorageService.instance().start();
+
+        // write a temporary SSTable, but don't register it
+        Set<String> content = new HashSet<String>();
+        content.add("key");
+        SSTableReader sstable = SSTableUtils.writeSSTable(content);
+        String tablename = sstable.getTableName();
+        String cfname = sstable.getColumnFamilyName();
+
+        // transfer
+        Streaming.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
+
+        // confirm that the SSTable was transferred and registered
+        ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);
+        RangeReply rr = cfstore.getKeyRange("", "", 2);
+        assert rr.keys.size() == 1;
+        assert rr.keys.contains("key");
+    }
+}

Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native