You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 17:36:33 UTC
svn commit: r888864 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: dht/ io/ service/
Author: jbellis
Date: Wed Dec 9 16:36:31 2009
New Revision: 888864
URL: http://svn.apache.org/viewvc?rev=888864&view=rev
Log:
Rename BootstrapMetadata and friends to StreamRequest* as that is what those essentially are. Move them to cassandra.io where rest of streaming stuff resides. patch by Jaakko Laine; reviewed by jbellis for CASSANDRA-564
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java (with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java (contents, props changed)
- copied, changed from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java (contents, props changed)
- copied, changed from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Dec 9 16:36:31 2009
@@ -33,21 +33,13 @@
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
- import org.apache.cassandra.net.io.StreamContextManager;
- import org.apache.cassandra.net.io.IStreamComplete;
import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.service.StreamManager;
- import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetector;
- import org.apache.cassandra.io.DataInputBuffer;
- import org.apache.cassandra.io.SSTableReader;
- import org.apache.cassandra.io.SSTableWriter;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.Table;
+ import org.apache.cassandra.io.Streaming;
import com.google.common.collect.Multimap;
import com.google.common.collect.ArrayListMultimap;
@@ -56,7 +48,7 @@
* This class handles the bootstrapping responsibilities for the local endpoint.
*
* - bootstrapTokenVerb asks the most-loaded node what Token to use to split its Range in two.
- * - bootstrapMetadataVerb tells source nodes to send us the necessary Ranges
+ * - streamRequestVerb tells source nodes to send us the necessary Ranges
* - source nodes send streamInitiateVerb to us to say "get ready to receive data" [if there is data to send]
* - when we have everything set up to receive the data, we send streamInitiateDoneVerb back to the source nodes and they start streaming
* - when streaming is complete, we send streamFinishedVerb to the source so it can clean up on its end
@@ -96,12 +88,10 @@
for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
{
InetAddress source = entry.getKey();
- if (logger.isDebugEnabled())
- logger.debug("Sending BootstrapMetadataMessage to " + source + " for " + StringUtils.join(entry.getValue(), ", "));
- BootstrapMetadata bsMetadata = new BootstrapMetadata(address, entry.getValue());
- Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(new BootstrapMetadataMessage(bsMetadata));
- MessagingService.instance().sendOneWay(message, source);
StorageService.instance().addBootstrapSource(source);
+ if (logger.isDebugEnabled())
+ logger.debug("Requesting from " + source + " ranges " + StringUtils.join(entry.getValue(), ", "));
+ Streaming.requestRanges(source, entry.getValue());
}
}
}).start();
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java?rev=888864&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java Wed Dec 9 16:36:31 2009
@@ -0,0 +1,93 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
+
+ /**
+ * This class encapsulates the message that needs to be sent to nodes
+ * that handoff data. The message contains information about ranges
+ * that need to be transferred and the target node.
+*/
+class StreamRequestMessage
+{
+ private static ICompactSerializer<StreamRequestMessage> serializer_;
+ static
+ {
+ serializer_ = new StreamRequestMessageSerializer();
+ }
+
+ protected static ICompactSerializer<StreamRequestMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage)
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ try
+ {
+ StreamRequestMessage.serializer().serialize(streamRequestMessage, dos);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ return new Message(FBUtilities.getLocalAddress(), StageManager.streamStage_, StorageService.streamRequestVerbHandler_, bos.toByteArray() );
+ }
+
+ protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0];
+
+ // TODO only actually ever need one BM, not an array
+ StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
+ {
+ assert streamRequestMetadata != null;
+ streamRequestMetadata_ = streamRequestMetadata;
+ }
+}
+
+class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
+{
+ public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException
+ {
+ StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
+ dos.writeInt(streamRequestMetadata.length);
+ for (StreamRequestMetadata bsmd : streamRequestMetadata)
+ {
+ StreamRequestMetadata.serializer().serialize(bsmd, dos);
+ }
+ }
+
+ public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis);
+ }
+ return new StreamRequestMessage(streamRequestMetadata);
+ }
+}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java (from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java&r1=888712&r2=888864&rev=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java Wed Dec 9 16:36:31 2009
@@ -16,37 +16,34 @@
* limitations under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.io;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
-import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import java.net.InetAddress;
-
-
+import org.apache.cassandra.dht.Range;
/**
- * This encapsulates information of the list of
- * ranges that a target node requires in order to
- * be bootstrapped. This will be bundled in a
- * BootstrapMetadataMessage and sent to nodes that
- * are going to handoff the data.
+ * This encapsulates information of the list of ranges that a target
+ * node requires to be transferred. This will be bundled in a
+ * StreamRequestsMessage and sent to nodes that are going to handoff
+ * the data.
*/
-class BootstrapMetadata
+class StreamRequestMetadata
{
- private static ICompactSerializer<BootstrapMetadata> serializer_;
+ private static ICompactSerializer<StreamRequestMetadata> serializer_;
static
{
- serializer_ = new BootstrapMetadataSerializer();
+ serializer_ = new StreamRequestMetadataSerializer();
}
- protected static ICompactSerializer<BootstrapMetadata> serializer()
+ protected static ICompactSerializer<StreamRequestMetadata> serializer()
{
return serializer_;
}
@@ -54,7 +51,7 @@
protected InetAddress target_;
protected Collection<Range> ranges_;
- BootstrapMetadata(InetAddress target, Collection<Range> ranges)
+ StreamRequestMetadata(InetAddress target, Collection<Range> ranges)
{
target_ = target;
ranges_ = ranges;
@@ -74,19 +71,19 @@
}
}
-class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
+class StreamRequestMetadataSerializer implements ICompactSerializer<StreamRequestMetadata>
{
- public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
+ public void serialize(StreamRequestMetadata srMetadata, DataOutputStream dos) throws IOException
{
- CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
- dos.writeInt(bsMetadata.ranges_.size());
- for (Range range : bsMetadata.ranges_)
+ CompactEndPointSerializationHelper.serialize(srMetadata.target_, dos);
+ dos.writeInt(srMetadata.ranges_.size());
+ for (Range range : srMetadata.ranges_)
{
Range.serializer().serialize(range, dos);
}
}
- public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
+ public StreamRequestMetadata deserialize(DataInputStream dis) throws IOException
{
InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
int size = dis.readInt();
@@ -95,7 +92,7 @@
{
ranges.add(Range.serializer().deserialize(dis));
}
- return new BootstrapMetadata( target, ranges );
+ return new StreamRequestMetadata( target, ranges );
}
}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java (from r888712, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java&r1=888712&r2=888864&rev=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java Wed Dec 9 16:36:31 2009
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.io;
import java.io.File;
import java.io.IOException;
@@ -27,9 +27,6 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.Streaming;
import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
@@ -43,34 +40,31 @@
import org.apache.commons.lang.StringUtils;
/**
- * This verb handler handles the BootstrapMetadataMessage that is sent
- * by the leader to the nodes that are responsible for handing off data.
+ * This verb handler handles the StreamRequestMessage that is sent by
+ * the node requesting range transfer.
*/
-public class BootstrapMetadataVerbHandler implements IVerbHandler
+public class StreamRequestVerbHandler implements IVerbHandler
{
- private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
+ private static Logger logger_ = Logger.getLogger(StreamRequestVerbHandler.class);
public void doVerb(Message message)
{
if (logger_.isDebugEnabled())
- logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
-
- /* Cannot bootstrap another node if I'm in bootstrap mode myself! */
- assert !StorageService.instance().isBootstrapMode();
+ logger_.debug("Received a StreamRequestMessage from " + message.getFrom());
byte[] body = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
try
{
- BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
- BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+ StreamRequestMessage streamRequestMessage = StreamRequestMessage.serializer().deserialize(bufIn);
+ StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
- for (BootstrapMetadata bsmd : bsMetadata)
+ for (StreamRequestMetadata srm : streamRequestMetadata)
{
if (logger_.isDebugEnabled())
- logger_.debug(bsmd.toString());
- Streaming.transferRanges(bsmd.target_, bsmd.ranges_, null);
+ logger_.debug(srm.toString());
+ Streaming.transferRanges(srm.target_, srm.ranges_, null);
}
}
catch (IOException ex)
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Dec 9 16:36:31 2009
@@ -126,6 +126,16 @@
}
}
+ /**
+ * Request ranges to be transferred
+ */
+ public static void requestRanges(InetAddress source, Collection<Range> ranges)
+ {
+ StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
+ Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
+ MessagingService.instance().sendOneWay(message, source);
+ }
+
public static class StreamInitiateVerbHandler implements IVerbHandler
{
/*
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Dec 9 16:36:31 2009
@@ -41,7 +41,6 @@
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.concurrent.StageManager;
@@ -83,14 +82,6 @@
}
};
- private static Comparator<Row> rowComparator = new Comparator<Row>()
- {
- public int compare(Row r1, Row r2)
- {
- return keyComparator.compare(r1.key, r2.key);
- }
- };
-
/**
* Use this method to have this RowMutation applied
* across all replicas. This method will take care
@@ -308,7 +299,6 @@
List<Row> rows = new ArrayList<Row>();
List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
- int commandIndex = 0;
for (ReadCommand command: commands)
{
@@ -337,7 +327,6 @@
ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
if (response.row() != null)
rows.add(response.row());
- commandIndex++;
}
return rows;
}
@@ -412,7 +401,6 @@
for (ReadCommand command: commands)
{
- // TODO: throw a thrift exception if we do not have N nodes
assert !command.isDigestQuery();
ReadCommand readMessageDigestOnly = command.copy();
readMessageDigestOnly.setDigestQuery(true);
@@ -707,7 +695,6 @@
List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(FBUtilities.getLocalAddress());
- // TODO: throw a thrift exception if we do not have N nodes
if (logger.isDebugEnabled())
logger.debug("weakreadlocal reading " + command);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Dec 9 16:36:31 2009
@@ -42,6 +42,7 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.io.StreamRequestVerbHandler;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -76,7 +77,7 @@
public final static String streamInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
public final static String streamFinishedVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
- public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
+ public final static String streamRequestVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
public final static String rangeSliceVerbHandler_ = "RANGE-SLICE-VERB-HANDLER";
public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
@@ -216,7 +217,7 @@
MessagingService.instance().registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
+ MessagingService.instance().registerVerbHandlers(streamRequestVerbHandler_, new StreamRequestVerbHandler() );
MessagingService.instance().registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());