You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:29:38 UTC

svn commit: r904932 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/net/io/ src/java/org/apache/cassandra/service/ src/java/org/apac...

Author: jbellis
Date: Sun Jan 31 00:29:37 2010
New Revision: 904932

URL: http://svn.apache.org/viewvc?rev=904932&view=rev
Log:
move IncomingStreamReader, StreamInitiateMessage, and BootstrapTest to streaming package.  r/m 'public' modifier from streaming classes that don't need it
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java   (contents, props changed)
      - copied, changed from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java   (contents, props changed)
      - copied, changed from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java   (contents, props changed)
      - copied, changed from r904930, incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Sun Jan 31 00:29:37 2010
@@ -28,7 +28,6 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamOut;
 
 import org.apache.log4j.Logger;
 
@@ -415,10 +414,10 @@
                 Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress, Long>(justRemovedEndPoints_);
                 for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
                 {
-                    if ((now - entry.getValue()) > StreamOut.RING_DELAY)
+                    if ((now - entry.getValue()) > StorageService.RING_DELAY)
                     {
                         if (logger_.isDebugEnabled())
-                            logger_.debug(StreamOut.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
+                            logger_.debug(StorageService.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
                         justRemovedEndPoints_.remove(entry.getKey());
                     }
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Sun Jan 31 00:29:37 2010
@@ -2,12 +2,10 @@
 
 import java.io.*;
 import java.net.Socket;
-import java.nio.ByteBuffer;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.net.io.IncomingStreamReader;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.streaming.IncomingStreamReader;
 
 public class IncomingTcpConnection extends Thread
 {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Sun Jan 31 00:29:37 2010
@@ -35,7 +35,6 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.streaming.StreamOut;
 
 /*
  * The load balancing algorithm here is an implementation of
@@ -365,7 +364,7 @@
                 Thread.sleep(100);
             }
             // one more sleep in case there are some stragglers
-            Thread.sleep(StreamOut.RING_DELAY);
+            Thread.sleep(StorageService.RING_DELAY);
         }
         catch (InterruptedException e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sun Jan 31 00:29:37 2010
@@ -62,6 +62,8 @@
 {
     private static Logger logger_ = Logger.getLogger(StorageService.class);     
 
+    public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
+
     public final static String MOVE_STATE = "MOVE";
 
     // this must be a char that cannot be present in any token
@@ -316,10 +318,10 @@
         isBootstrapMode = true;
         SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
         Gossiper.instance.addApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token)));
-        logger_.info("bootstrap sleeping " + StreamOut.RING_DELAY);
+        logger_.info("bootstrap sleeping " + RING_DELAY);
         try
         {
-            Thread.sleep(StreamOut.RING_DELAY);
+            Thread.sleep(RING_DELAY);
         }
         catch (InterruptedException e)
         {
@@ -1265,8 +1267,8 @@
 
         logger_.info("DECOMMISSIONING");
         startLeaving();
-        logger_.info("decommission sleeping " + StreamOut.RING_DELAY);
-        Thread.sleep(StreamOut.RING_DELAY);
+        logger_.info("decommission sleeping " + RING_DELAY);
+        Thread.sleep(RING_DELAY);
 
         Runnable finishLeaving = new Runnable()
         {
@@ -1364,8 +1366,8 @@
 
         logger_.info("starting move. leaving token " + getLocalToken());
         startLeaving();
-        logger_.info("move sleeping " + StreamOut.RING_DELAY);
-        Thread.sleep(StreamOut.RING_DELAY);
+        logger_.info("move sleeping " + RING_DELAY);
+        Thread.sleep(RING_DELAY);
 
         Runnable finishMoving = new WrappedRunnable()
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java Sun Jan 31 00:29:37 2010
@@ -10,7 +10,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class CompletedFileStatus
+class CompletedFileStatus
 {
     private static ICompactSerializer<CompletedFileStatus> serializer_;
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java Sun Jan 31 00:29:37 2010
@@ -22,7 +22,7 @@
 
 import java.net.InetAddress;
 
-public interface IStreamComplete
+interface IStreamComplete
 {
     /*
      * This callback if registered with the StreamContextManager is 

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
 
 import java.net.InetSocketAddress;
 import java.net.InetAddress;
@@ -27,10 +27,6 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.net.FileStreamTask;
-import org.apache.cassandra.streaming.CompletedFileStatus;
-import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInManager;
 
 public class IncomingStreamReader
 {

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java Sun Jan 31 00:29:37 2010
@@ -6,7 +6,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 
-public class InitiatedFile
+class InitiatedFile
 {
     private static ICompactSerializer<InitiatedFile> serializer_;
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:29:37 2010
@@ -29,7 +29,7 @@
 
 import org.apache.log4j.Logger;
 
-public class StreamInManager
+class StreamInManager
 {
     private static final Logger logger = Logger.getLogger(StreamInManager.class);
 

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java (from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -29,7 +29,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class StreamInitiateMessage
+class StreamInitiateMessage
 {
     private static ICompactSerializer<StreamInitiateMessage> serializer_;
 
@@ -62,32 +62,31 @@
     {
         return streamContexts_;
     }
-}
 
-class StreamInitiateMessageSerializer implements ICompactSerializer<StreamInitiateMessage>
-{
-    public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
+    private static class StreamInitiateMessageSerializer implements ICompactSerializer<StreamInitiateMessage>
     {
-        dos.writeInt(bim.streamContexts_.length);
-        for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+        public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
         {
-            InitiatedFile.serializer().serialize(initiatedFile, dos);
+            dos.writeInt(bim.streamContexts_.length);
+            for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+            {
+                InitiatedFile.serializer().serialize(initiatedFile, dos);
+            }
         }
-    }
-    
-    public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
-    {
-        int size = dis.readInt();
-        InitiatedFile[] initiatedFiles = new InitiatedFile[0];
-        if ( size > 0 )
+
+        public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
         {
-            initiatedFiles = new InitiatedFile[size];
-            for ( int i = 0; i < size; ++i )
+            int size = dis.readInt();
+            InitiatedFile[] initiatedFiles = new InitiatedFile[0];
+            if ( size > 0 )
             {
-                initiatedFiles[i] = InitiatedFile.serializer().deserialize(dis);
+                initiatedFiles = new InitiatedFile[size];
+                for ( int i = 0; i < size; ++i )
+                {
+                    initiatedFiles[i] = InitiatedFile.serializer().deserialize(dis);
+                }
             }
+            return new StreamInitiateMessage(initiatedFiles);
         }
-        return new StreamInitiateMessage(initiatedFiles);
     }
 }
-

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:29:37 2010
@@ -12,7 +12,7 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jan 31 00:29:37 2010
@@ -33,7 +33,7 @@
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.SSTable;
@@ -60,7 +60,6 @@
     private static Logger logger = Logger.getLogger(StreamOut.class);
 
     static String TABLE_NAME = "STREAMING-TABLE-NAME";
-    public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
 
     /**
      * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Sun Jan 31 00:29:37 2010
@@ -37,7 +37,7 @@
 /**
  * This class manages the streaming of multiple files one after the other.
 */
-public class StreamOutManager
+class StreamOutManager
 {   
     private static Logger logger = Logger.getLogger( StreamOutManager.class );
         

Copied: incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (from r904930, incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
 * specific language governing permissions and limitations
 * under the License.
 */
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
 
 import static junit.framework.Assert.assertEquals;
 import static org.junit.Assert.*;
@@ -27,8 +27,6 @@
 import java.util.Map;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
 
 import org.junit.Test;
 

Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
------------------------------------------------------------------------------
    svn:eol-style = native