You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 17:36:33 UTC

svn commit: r888864 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: dht/ io/ service/

Author: jbellis
Date: Wed Dec  9 16:36:31 2009
New Revision: 888864

URL: http://svn.apache.org/viewvc?rev=888864&view=rev
Log:
Rename BootstrapMetadata and friends to StreamRequest* as that is what those essentially are. Move them to cassandra.io where rest of streaming stuff resides.  patch by Jaakko Laine; reviewed by jbellis for CASSANDRA-564

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java   (with props)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java   (contents, props changed)
      - copied, changed from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java   (contents, props changed)
      - copied, changed from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Dec  9 16:36:31 2009
@@ -33,21 +33,13 @@
  import org.apache.cassandra.locator.TokenMetadata;
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.net.*;
- import org.apache.cassandra.net.io.StreamContextManager;
- import org.apache.cassandra.net.io.IStreamComplete;
  import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.service.StreamManager;
- import org.apache.cassandra.utils.LogUtil;
  import org.apache.cassandra.utils.SimpleCondition;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.gms.FailureDetector;
  import org.apache.cassandra.gms.IFailureDetector;
- import org.apache.cassandra.io.DataInputBuffer;
- import org.apache.cassandra.io.SSTableReader;
- import org.apache.cassandra.io.SSTableWriter;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.Table;
+ import org.apache.cassandra.io.Streaming;
  import com.google.common.collect.Multimap;
  import com.google.common.collect.ArrayListMultimap;
 
@@ -56,7 +48,7 @@
   * This class handles the bootstrapping responsibilities for the local endpoint.
   *
   *  - bootstrapTokenVerb asks the most-loaded node what Token to use to split its Range in two.
-  *  - bootstrapMetadataVerb tells source nodes to send us the necessary Ranges
+  *  - streamRequestVerb tells source nodes to send us the necessary Ranges
   *  - source nodes send streamInitiateVerb to us to say "get ready to receive data" [if there is data to send]
   *  - when we have everything set up to receive the data, we send streamInitiateDoneVerb back to the source nodes and they start streaming
   *  - when streaming is complete, we send streamFinishedVerb to the source so it can clean up on its end
@@ -96,12 +88,10 @@
                 for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
                 {
                     InetAddress source = entry.getKey();
-                    if (logger.isDebugEnabled())
-                        logger.debug("Sending BootstrapMetadataMessage to " + source + " for " + StringUtils.join(entry.getValue(), ", "));
-                    BootstrapMetadata bsMetadata = new BootstrapMetadata(address, entry.getValue());
-                    Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(new BootstrapMetadataMessage(bsMetadata));
-                    MessagingService.instance().sendOneWay(message, source);
                     StorageService.instance().addBootstrapSource(source);
+                    if (logger.isDebugEnabled())
+                        logger.debug("Requesting from " + source + " ranges " + StringUtils.join(entry.getValue(), ", "));
+                    Streaming.requestRanges(source, entry.getValue());
                 }
             }
         }).start();

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java?rev=888864&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java Wed Dec  9 16:36:31 2009
@@ -0,0 +1,93 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
+
+ /**
+ * This class encapsulates the message that needs to be sent to nodes
+ * that handoff data. The message contains information about ranges
+ * that need to be transferred and the target node.
+*/
+class StreamRequestMessage
+{
+    private static ICompactSerializer<StreamRequestMessage> serializer_;
+    static
+    {
+        serializer_ = new StreamRequestMessageSerializer();
+    }
+    
+    protected static ICompactSerializer<StreamRequestMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage)
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        try
+        {
+            StreamRequestMessage.serializer().serialize(streamRequestMessage, dos);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+        return new Message(FBUtilities.getLocalAddress(), StageManager.streamStage_, StorageService.streamRequestVerbHandler_, bos.toByteArray() );
+    }        
+    
+    protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0];
+
+    // TODO only actually ever need one BM, not an array
+    StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
+    {
+        assert streamRequestMetadata != null;
+        streamRequestMetadata_ = streamRequestMetadata;
+    }
+}
+
+class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
+{
+    public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException
+    {
+        StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
+        dos.writeInt(streamRequestMetadata.length);
+        for (StreamRequestMetadata bsmd : streamRequestMetadata)
+        {
+            StreamRequestMetadata.serializer().serialize(bsmd, dos);
+        }
+    }
+
+    public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
+    {            
+        int size = dis.readInt();
+        StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size];
+        for ( int i = 0; i < size; ++i )
+        {
+            streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis);
+        }
+        return new StreamRequestMessage(streamRequestMetadata);
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java (from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java&r1=888712&r2=888864&rev=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java Wed Dec  9 16:36:31 2009
@@ -16,37 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.dht;
+package org.apache.cassandra.io;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Collection;
 
-import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import java.net.InetAddress;
-
-
+import org.apache.cassandra.dht.Range;
 
 /**
- * This encapsulates information of the list of 
- * ranges that a target node requires in order to 
- * be bootstrapped. This will be bundled in a 
- * BootstrapMetadataMessage and sent to nodes that
- * are going to handoff the data.
+ * This encapsulates information of the list of ranges that a target
+ * node requires to be transferred. This will be bundled in a
+ * StreamRequestsMessage and sent to nodes that are going to handoff
+ * the data.
 */
-class BootstrapMetadata
+class StreamRequestMetadata
 {
-    private static ICompactSerializer<BootstrapMetadata> serializer_;
+    private static ICompactSerializer<StreamRequestMetadata> serializer_;
     static
     {
-        serializer_ = new BootstrapMetadataSerializer();
+        serializer_ = new StreamRequestMetadataSerializer();
     }
     
-    protected static ICompactSerializer<BootstrapMetadata> serializer()
+    protected static ICompactSerializer<StreamRequestMetadata> serializer()
     {
         return serializer_;
     }
@@ -54,7 +51,7 @@
     protected InetAddress target_;
     protected Collection<Range> ranges_;
     
-    BootstrapMetadata(InetAddress target, Collection<Range> ranges)
+    StreamRequestMetadata(InetAddress target, Collection<Range> ranges)
     {
         target_ = target;
         ranges_ = ranges;
@@ -74,19 +71,19 @@
     }
 }
 
-class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
+class StreamRequestMetadataSerializer implements ICompactSerializer<StreamRequestMetadata>
 {
-    public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
+    public void serialize(StreamRequestMetadata srMetadata, DataOutputStream dos) throws IOException
     {
-        CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
-        dos.writeInt(bsMetadata.ranges_.size());
-        for (Range range : bsMetadata.ranges_)
+        CompactEndPointSerializationHelper.serialize(srMetadata.target_, dos);
+        dos.writeInt(srMetadata.ranges_.size());
+        for (Range range : srMetadata.ranges_)
         {
             Range.serializer().serialize(range, dos);
         }
     }
 
-    public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
+    public StreamRequestMetadata deserialize(DataInputStream dis) throws IOException
     {            
         InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
         int size = dis.readInt();
@@ -95,7 +92,7 @@
         {
             ranges.add(Range.serializer().deserialize(dis));
         }            
-        return new BootstrapMetadata( target, ranges );
+        return new StreamRequestMetadata( target, ranges );
     }
 }
 

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java (from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java&r1=888712&r2=888864&rev=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java Wed Dec  9 16:36:31 2009
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.dht;
+package org.apache.cassandra.io;
 
 import java.io.File;
 import java.io.IOException;
@@ -27,9 +27,6 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.Streaming;
 
 import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
@@ -43,34 +40,31 @@
 import org.apache.commons.lang.StringUtils;
 
  /**
- * This verb handler handles the BootstrapMetadataMessage that is sent
- * by the leader to the nodes that are responsible for handing off data. 
+ * This verb handler handles the StreamRequestMessage that is sent by
+ * the node requesting range transfer.
 */
-public class BootstrapMetadataVerbHandler implements IVerbHandler
+public class StreamRequestVerbHandler implements IVerbHandler
 {
-    private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
+    private static Logger logger_ = Logger.getLogger(StreamRequestVerbHandler.class);
     
     public void doVerb(Message message)
     {
         if (logger_.isDebugEnabled())
-          logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
-        
-        /* Cannot bootstrap another node if I'm in bootstrap mode myself! */
-        assert !StorageService.instance().isBootstrapMode();
+            logger_.debug("Received a StreamRequestMessage from " + message.getFrom());
         
         byte[] body = message.getMessageBody();
         DataInputBuffer bufIn = new DataInputBuffer();
         bufIn.reset(body, body.length);
         try
         {
-            BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
-            BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+            StreamRequestMessage streamRequestMessage = StreamRequestMessage.serializer().deserialize(bufIn);
+            StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
 
-            for (BootstrapMetadata bsmd : bsMetadata)
+            for (StreamRequestMetadata srm : streamRequestMetadata)
             {
                 if (logger_.isDebugEnabled())
-                    logger_.debug(bsmd.toString());
-                Streaming.transferRanges(bsmd.target_, bsmd.ranges_, null);
+                    logger_.debug(srm.toString());
+                Streaming.transferRanges(srm.target_, srm.ranges_, null);
             }
         }
         catch (IOException ex)

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Dec  9 16:36:31 2009
@@ -126,6 +126,16 @@
         }
     }
 
+    /**
+     * Request ranges to be transferred
+     */
+    public static void requestRanges(InetAddress source, Collection<Range> ranges)
+    {
+        StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
+        Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
+        MessagingService.instance().sendOneWay(message, source);
+    }
+
     public static class StreamInitiateVerbHandler implements IVerbHandler
     {
         /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Dec  9 16:36:31 2009
@@ -41,7 +41,6 @@
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.concurrent.StageManager;
 
@@ -83,14 +82,6 @@
         }
     };
 
-    private static Comparator<Row> rowComparator = new Comparator<Row>()
-    {
-        public int compare(Row r1, Row r2)
-        {
-            return keyComparator.compare(r1.key, r2.key);
-        }
-    };
-
     /**
      * Use this method to have this RowMutation applied
      * across all replicas. This method will take care
@@ -308,7 +299,6 @@
 
         List<Row> rows = new ArrayList<Row>();
         List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
-        int commandIndex = 0;
 
         for (ReadCommand command: commands)
         {
@@ -337,7 +327,6 @@
             ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
             if (response.row() != null)
                 rows.add(response.row());
-            commandIndex++;
         }
         return rows;
     }
@@ -412,7 +401,6 @@
 
         for (ReadCommand command: commands)
         {
-            // TODO: throw a thrift exception if we do not have N nodes
             assert !command.isDigestQuery();
             ReadCommand readMessageDigestOnly = command.copy();
             readMessageDigestOnly.setDigestQuery(true);
@@ -707,7 +695,6 @@
             List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
             /* Remove the local storage endpoint from the list. */
             endpoints.remove(FBUtilities.getLocalAddress());
-            // TODO: throw a thrift exception if we do not have N nodes
 
             if (logger.isDebugEnabled())
                 logger.debug("weakreadlocal reading " + command);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Dec  9 16:36:31 2009
@@ -42,6 +42,7 @@
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.io.StreamRequestVerbHandler;
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
@@ -76,7 +77,7 @@
     public final static String streamInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
     public final static String streamFinishedVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
     public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
-    public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
+    public final static String streamRequestVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
     public final static String rangeSliceVerbHandler_ = "RANGE-SLICE-VERB-HANDLER";
     public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
@@ -216,7 +217,7 @@
         MessagingService.instance().registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
+        MessagingService.instance().registerVerbHandlers(streamRequestVerbHandler_, new StreamRequestVerbHandler() );
         MessagingService.instance().registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
         MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
         MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());