You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [11/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java Thu Jul 30 15:30:21 2009
@@ -1,99 +1,99 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.net.io.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class BootstrapInitiateMessage implements Serializable
-{
- private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
-
- static
- {
- serializer_ = new BootstrapInitiateMessageSerializer();
- }
-
- public static ICompactSerializer<BootstrapInitiateMessage> serializer()
- {
- return serializer_;
- }
-
- public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
- return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
- }
-
- protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
-
- public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
- {
- streamContexts_ = streamContexts;
- }
-
- public StreamContextManager.StreamContext[] getStreamContext()
- {
- return streamContexts_;
- }
-}
-
-class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
-{
- public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
- {
- dos.writeInt(bim.streamContexts_.length);
- for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
- {
- StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
- }
- }
-
- public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
- {
- int size = dis.readInt();
- StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
- if ( size > 0 )
- {
- streamContexts = new StreamContextManager.StreamContext[size];
- for ( int i = 0; i < size; ++i )
- {
- streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
- }
- }
- return new BootstrapInitiateMessage(streamContexts);
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.net.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BootstrapInitiateMessage implements Serializable
+{
+ private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
+
+ static
+ {
+ serializer_ = new BootstrapInitiateMessageSerializer();
+ }
+
+ public static ICompactSerializer<BootstrapInitiateMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
+ return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
+ }
+
+ protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
+
+ public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+ {
+ streamContexts_ = streamContexts;
+ }
+
+ public StreamContextManager.StreamContext[] getStreamContext()
+ {
+ return streamContexts_;
+ }
+}
+
+class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
+{
+ public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
+ {
+ dos.writeInt(bim.streamContexts_.length);
+ for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
+ {
+ StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
+ }
+ }
+
+ public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
+ if ( size > 0 )
+ {
+ streamContexts = new StreamContextManager.StreamContext[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
+ }
+ }
+ return new BootstrapInitiateMessage(streamContexts);
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java Thu Jul 30 15:30:21 2009
@@ -1,102 +1,102 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
-
-
-
-/**
- * This encapsulates information of the list of
- * ranges that a target node requires in order to
- * be bootstrapped. This will be bundled in a
- * BootstrapMetadataMessage and sent to nodes that
- * are going to handoff the data.
-*/
-class BootstrapMetadata
-{
- private static ICompactSerializer<BootstrapMetadata> serializer_;
- static
- {
- serializer_ = new BootstrapMetadataSerializer();
- }
-
- protected static ICompactSerializer<BootstrapMetadata> serializer()
- {
- return serializer_;
- }
-
- protected EndPoint target_;
- protected List<Range> ranges_;
-
- BootstrapMetadata(EndPoint target, List<Range> ranges)
- {
- target_ = target;
- ranges_ = ranges;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder("");
- sb.append(target_);
- sb.append("------->");
- for ( Range range : ranges_ )
- {
- sb.append(range);
- sb.append(" ");
- }
- return sb.toString();
- }
-}
-
-class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
-{
- public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
- {
- CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
- int size = (bsMetadata.ranges_ == null) ? 0 : bsMetadata.ranges_.size();
- dos.writeInt(size);
-
- for ( Range range : bsMetadata.ranges_ )
- {
- Range.serializer().serialize(range, dos);
- }
- }
-
- public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
- {
- EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
- int size = dis.readInt();
- List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
- for( int i = 0; i < size; ++i )
- {
- ranges.add(Range.serializer().deserialize(dis));
- }
- return new BootstrapMetadata( target, ranges );
- }
-}
-
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This encapsulates information of the list of
+ * ranges that a target node requires in order to
+ * be bootstrapped. This will be bundled in a
+ * BootstrapMetadataMessage and sent to nodes that
+ * are going to handoff the data.
+*/
+class BootstrapMetadata
+{
+ private static ICompactSerializer<BootstrapMetadata> serializer_;
+ static
+ {
+ serializer_ = new BootstrapMetadataSerializer();
+ }
+
+ protected static ICompactSerializer<BootstrapMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ protected EndPoint target_;
+ protected List<Range> ranges_;
+
+ BootstrapMetadata(EndPoint target, List<Range> ranges)
+ {
+ target_ = target;
+ ranges_ = ranges;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append(target_);
+ sb.append("------->");
+ for ( Range range : ranges_ )
+ {
+ sb.append(range);
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+}
+
+class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
+{
+ public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
+ {
+ CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
+ int size = (bsMetadata.ranges_ == null) ? 0 : bsMetadata.ranges_.size();
+ dos.writeInt(size);
+
+ for ( Range range : bsMetadata.ranges_ )
+ {
+ Range.serializer().serialize(range, dos);
+ }
+ }
+
+ public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
+ {
+ EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
+ int size = dis.readInt();
+ List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+ for( int i = 0; i < size; ++i )
+ {
+ ranges.add(Range.serializer().deserialize(dis));
+ }
+ return new BootstrapMetadata( target, ranges );
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Thu Jul 30 15:30:21 2009
@@ -1,90 +1,90 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-
-
-
-/**
- * This class encapsulates the message that needs to be sent
- * to nodes that handoff data. The message contains information
- * about the node to be bootstrapped and the ranges with which
- * it needs to be bootstrapped.
-*/
-class BootstrapMetadataMessage
-{
- private static ICompactSerializer<BootstrapMetadataMessage> serializer_;
- static
- {
- serializer_ = new BootstrapMetadataMessageSerializer();
- }
-
- protected static ICompactSerializer<BootstrapMetadataMessage> serializer()
- {
- return serializer_;
- }
-
- protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
- return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
- }
-
- protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
-
- BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
- {
- bsMetadata_ = bsMetadata;
- }
-}
-
-class BootstrapMetadataMessageSerializer implements ICompactSerializer<BootstrapMetadataMessage>
-{
- public void serialize(BootstrapMetadataMessage bsMetadataMessage, DataOutputStream dos) throws IOException
- {
- BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
- int size = (bsMetadata == null) ? 0 : bsMetadata.length;
- dos.writeInt(size);
- for ( BootstrapMetadata bsmd : bsMetadata )
- {
- BootstrapMetadata.serializer().serialize(bsmd, dos);
- }
- }
-
- public BootstrapMetadataMessage deserialize(DataInputStream dis) throws IOException
- {
- int size = dis.readInt();
- BootstrapMetadata[] bsMetadata = new BootstrapMetadata[size];
- for ( int i = 0; i < size; ++i )
- {
- bsMetadata[i] = BootstrapMetadata.serializer().deserialize(dis);
- }
- return new BootstrapMetadataMessage(bsMetadata);
- }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+
+/**
+ * This class encapsulates the message that needs to be sent
+ * to nodes that handoff data. The message contains information
+ * about the node to be bootstrapped and the ranges with which
+ * it needs to be bootstrapped.
+*/
+class BootstrapMetadataMessage
+{
+ private static ICompactSerializer<BootstrapMetadataMessage> serializer_;
+ static
+ {
+ serializer_ = new BootstrapMetadataMessageSerializer();
+ }
+
+ protected static ICompactSerializer<BootstrapMetadataMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+ return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
+ }
+
+ protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
+
+ BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
+ {
+ bsMetadata_ = bsMetadata;
+ }
+}
+
+class BootstrapMetadataMessageSerializer implements ICompactSerializer<BootstrapMetadataMessage>
+{
+ public void serialize(BootstrapMetadataMessage bsMetadataMessage, DataOutputStream dos) throws IOException
+ {
+ BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+ int size = (bsMetadata == null) ? 0 : bsMetadata.length;
+ dos.writeInt(size);
+ for ( BootstrapMetadata bsmd : bsMetadata )
+ {
+ BootstrapMetadata.serializer().serialize(bsmd, dos);
+ }
+ }
+
+ public BootstrapMetadataMessage deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ BootstrapMetadata[] bsMetadata = new BootstrapMetadata[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ bsMetadata[i] = BootstrapMetadata.serializer().deserialize(dis);
+ }
+ return new BootstrapMetadataMessage(bsMetadata);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,173 +1,173 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StreamManager;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * This verb handler handles the BootstrapMetadataMessage that is sent
- * by the leader to the nodes that are responsible for handing off data.
-*/
-public class BootstrapMetadataVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
-
- public void doVerb(Message message)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
- byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
- try
- {
- BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
- BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
-
- /*
- * This is for debugging purposes. Remove later.
- */
- for ( BootstrapMetadata bsmd : bsMetadata )
- {
- if (logger_.isDebugEnabled())
- logger_.debug(bsmd.toString());
- }
-
- for ( BootstrapMetadata bsmd : bsMetadata )
- {
- long startTime = System.currentTimeMillis();
- doTransfer(bsmd.target_, bsmd.ranges_);
- if (logger_.isDebugEnabled())
- logger_.debug("Time taken to boostrap " +
- bsmd.target_ +
- " is " +
- (System.currentTimeMillis() - startTime) +
- " msecs.");
- }
- }
- catch ( IOException ex )
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
-
- /*
- * This method needs to figure out the files on disk
- * locally for each range and then stream them using
- * the Bootstrap protocol to the target endpoint.
- */
- private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
- {
- if ( ranges.size() == 0 )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("No ranges to give scram ...");
- return;
- }
-
- /* Just for debugging process - remove later */
- for ( Range range : ranges )
- {
- StringBuilder sb = new StringBuilder("");
- sb.append(range.toString());
- sb.append(" ");
- if (logger_.isDebugEnabled())
- logger_.debug("Beginning transfer process to " + target + " for ranges " + sb.toString());
- }
-
- /*
- * (1) First we dump all the memtables to disk.
- * (2) Run a version of compaction which will basically
- * put the keys in the range specified into a directory
- * named as per the endpoint it is destined for inside the
- * bootstrap directory.
- * (3) Handoff the data.
- */
- List<String> tables = DatabaseDescriptor.getTables();
- for ( String tName : tables )
- {
- Table table = Table.open(tName);
- if (logger_.isDebugEnabled())
- logger_.debug("Flushing memtables ...");
- table.flush(false);
- if (logger_.isDebugEnabled())
- logger_.debug("Forcing compaction ...");
- /* Get the counting bloom filter for each endpoint and the list of files that need to be streamed */
- List<String> fileList = new ArrayList<String>();
- boolean bVal = table.forceCompaction(ranges, target, fileList);
- doHandoff(target, fileList, tName);
- }
- }
-
- /**
- * Stream the files in the bootstrap directory over to the
- * node being bootstrapped.
- */
- private void doHandoff(EndPoint target, List<String> fileList, String table) throws IOException
- {
- List<File> filesList = new ArrayList<File>();
- for(String file : fileList)
- {
- filesList.add(new File(file));
- }
- File[] files = filesList.toArray(new File[0]);
- StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[files.length];
- int i = 0;
- for ( File file : files )
- {
- streamContexts[i] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), table);
- if (logger_.isDebugEnabled())
- logger_.debug("Stream context metadata " + streamContexts[i]);
- ++i;
- }
-
- if ( files.length > 0 )
- {
- /* Set up the stream manager with the files that need to streamed */
- StreamManager.instance(target).addFilesToStream(streamContexts);
- /* Send the bootstrap initiate message */
- BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
- Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
- MessagingService.getMessagingInstance().sendOneWay(message, target);
- if (logger_.isDebugEnabled())
- logger_.debug("Waiting for transfer to " + target + " to complete");
- StreamManager.instance(target).waitForStreamCompletion();
- if (logger_.isDebugEnabled())
- logger_.debug("Done with transfer to " + target);
- }
- }
-}
-
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StreamManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This verb handler handles the BootstrapMetadataMessage that is sent
+ * by the leader to the nodes that are responsible for handing off data.
+*/
+public class BootstrapMetadataVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
+ byte[] body = message.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ try
+ {
+ BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
+ BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+
+ /*
+ * This is for debugging purposes. Remove later.
+ */
+ for ( BootstrapMetadata bsmd : bsMetadata )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(bsmd.toString());
+ }
+
+ for ( BootstrapMetadata bsmd : bsMetadata )
+ {
+ long startTime = System.currentTimeMillis();
+ doTransfer(bsmd.target_, bsmd.ranges_);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Time taken to boostrap " +
+ bsmd.target_ +
+ " is " +
+ (System.currentTimeMillis() - startTime) +
+ " msecs.");
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+
+ /*
+ * This method needs to figure out the files on disk
+ * locally for each range and then stream them using
+ * the Bootstrap protocol to the target endpoint.
+ */
+ private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
+ {
+ if ( ranges.size() == 0 )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("No ranges to give scram ...");
+ return;
+ }
+
+ /* Just for debugging process - remove later */
+ for ( Range range : ranges )
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append(range.toString());
+ sb.append(" ");
+ if (logger_.isDebugEnabled())
+ logger_.debug("Beginning transfer process to " + target + " for ranges " + sb.toString());
+ }
+
+ /*
+ * (1) First we dump all the memtables to disk.
+ * (2) Run a version of compaction which will basically
+ * put the keys in the range specified into a directory
+ * named as per the endpoint it is destined for inside the
+ * bootstrap directory.
+ * (3) Handoff the data.
+ */
+ List<String> tables = DatabaseDescriptor.getTables();
+ for ( String tName : tables )
+ {
+ Table table = Table.open(tName);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Flushing memtables ...");
+ table.flush(false);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Forcing compaction ...");
+ /* Get the counting bloom filter for each endpoint and the list of files that need to be streamed */
+ List<String> fileList = new ArrayList<String>();
+ boolean bVal = table.forceCompaction(ranges, target, fileList);
+ doHandoff(target, fileList, tName);
+ }
+ }
+
+ /**
+ * Stream the files in the bootstrap directory over to the
+ * node being bootstrapped.
+ */
+ private void doHandoff(EndPoint target, List<String> fileList, String table) throws IOException
+ {
+ List<File> filesList = new ArrayList<File>();
+ for(String file : fileList)
+ {
+ filesList.add(new File(file));
+ }
+ File[] files = filesList.toArray(new File[0]);
+ StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[files.length];
+ int i = 0;
+ for ( File file : files )
+ {
+ streamContexts[i] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), table);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Stream context metadata " + streamContexts[i]);
+ ++i;
+ }
+
+ if ( files.length > 0 )
+ {
+ /* Set up the stream manager with the files that need to streamed */
+ StreamManager.instance(target).addFilesToStream(streamContexts);
+ /* Send the bootstrap initiate message */
+ BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
+ Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
+ MessagingService.getMessagingInstance().sendOneWay(message, target);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Waiting for transfer to " + target + " to complete");
+ StreamManager.instance(target).waitForStreamCompletion();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Done with transfer to " + target);
+ }
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java Thu Jul 30 15:30:21 2009
@@ -1,49 +1,49 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import org.apache.cassandra.net.EndPoint;
-
-/**
- * This class encapsulates who is the source and the
- * target of a bootstrap for a particular range.
- */
-class BootstrapSourceTarget
-{
- protected EndPoint source_;
- protected EndPoint target_;
-
- BootstrapSourceTarget(EndPoint source, EndPoint target)
- {
- source_ = source;
- target_ = target;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder("");
- sb.append("SOURCE: ");
- sb.append(source_);
- sb.append(" ----> ");
- sb.append("TARGET: ");
- sb.append(target_);
- sb.append(" ");
- return sb.toString();
- }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * This class encapsulates who is the source and the
+ * target of a bootstrap for a particular range.
+ */
+class BootstrapSourceTarget
+{
+ protected EndPoint source_;
+ protected EndPoint target_;
+
+ BootstrapSourceTarget(EndPoint source, EndPoint target)
+ {
+ source_ = source;
+ target_ = target;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append("SOURCE: ");
+ sb.append(source_);
+ sb.append(" ----> ");
+ sb.append("TARGET: ");
+ sb.append(target_);
+ sb.append(" ");
+ return sb.toString();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java Thu Jul 30 15:30:21 2009
@@ -1,48 +1,48 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.util.Comparator;
-
-public interface IPartitioner
-{
- /**
- * transform key to on-disk format s.t. keys are stored in node comparison order.
- * this lets bootstrap rip out parts of the sstable sequentially instead of doing random seeks.
- *
- * @param key the raw, client-facing key
- * @return decorated on-disk version of key
- */
- public String decorateKey(String key);
-
- public String undecorateKey(String decoratedKey);
-
- public Comparator<String> getDecoratedKeyComparator();
-
- public Comparator<String> getReverseDecoratedKeyComparator();
-
- /**
- * @return the token to use for this node if none was saved
- */
- public Token getInitialToken(String key);
-
- public Token getDefaultToken();
-
- public Token.TokenFactory getTokenFactory();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.util.Comparator;
+
+public interface IPartitioner
+{
+ /**
+ * transform key to on-disk format s.t. keys are stored in node comparison order.
+ * this lets bootstrap rip out parts of the sstable sequentially instead of doing random seeks.
+ *
+ * @param key the raw, client-facing key
+ * @return decorated on-disk version of key
+ */
+ public String decorateKey(String key);
+
+ public String undecorateKey(String decoratedKey);
+
+ public Comparator<String> getDecoratedKeyComparator();
+
+ public Comparator<String> getReverseDecoratedKeyComparator();
+
+ /**
+ * @return the token to use for this node if none was saved
+ */
+ public Token getInitialToken(String key);
+
+ public Token getDefaultToken();
+
+ public Token.TokenFactory getTokenFactory();
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Thu Jul 30 15:30:21 2009
@@ -1,229 +1,229 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
-
-
-class LeaveJoinProtocolHelper
-{
- private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolHelper.class);
-
- /**
- * Give a range a-------------b which is being split as
- * a-----x-----y-----b then we want a mapping from
- * (a, b] --> (a, x], (x, y], (y, b]
- */
- protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
- {
- Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
- Token[] tokens = new Token[allTokens.length];
- System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
- Arrays.sort(tokens);
-
- Range prevRange = null;
- Token prevToken = null;
- boolean bVal = false;
-
- for ( Range oldRange : oldRanges )
- {
- if (bVal)
- {
- bVal = false;
- List<Range> subRanges = splitRanges.get(prevRange);
- if ( subRanges != null )
- subRanges.add( new Range(prevToken, prevRange.right()) );
- }
-
- prevRange = oldRange;
- prevToken = oldRange.left();
- for (Token token : tokens)
- {
- List<Range> subRanges = splitRanges.get(oldRange);
- if ( oldRange.contains(token) )
- {
- if ( subRanges == null )
- {
- subRanges = new ArrayList<Range>();
- splitRanges.put(oldRange, subRanges);
- }
- subRanges.add( new Range(prevToken, token) );
- prevToken = token;
- bVal = true;
- }
- else
- {
- if ( bVal )
- {
- bVal = false;
- subRanges.add( new Range(prevToken, oldRange.right()) );
- }
- }
- }
- }
- /* This is to handle the last range being processed. */
- if ( bVal )
- {
- bVal = false;
- List<Range> subRanges = splitRanges.get(prevRange);
- subRanges.add( new Range(prevToken, prevRange.right()) );
- }
- return splitRanges;
- }
-
- protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
- {
- Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
- /*
- * Basically calculate for each range the endpoints handling the
- * range in the old token set and in the new token set. Whoever
- * gets bumped out of the top N will have to hand off that range
- * to the new dude.
- */
- Set<Range> oldRangeSet = oldRangeToEndPointMap.keySet();
- for(Range range : oldRangeSet)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
- List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
- List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
- if ( newEndPoints != null )
- {
- List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
- for ( EndPoint newEndPoint : newEndPoints2 )
- {
- if ( oldEndPoints.contains(newEndPoint) )
- {
- oldEndPoints.remove(newEndPoint);
- newEndPoints.remove(newEndPoint);
- }
- }
- }
- else
- {
- logger_.warn("Trespassing - scram");
- }
- if (logger_.isDebugEnabled())
- logger_.debug("Done figuring out the dudes who are bumped out for range " + range + " ...");
- }
- for ( Range range : oldRangeSet )
- {
- List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
- List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
- List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
- if ( srcTarget == null )
- {
- srcTarget = new ArrayList<BootstrapSourceTarget>();
- rangesWithSourceTarget.put(range, srcTarget);
- }
- int i = 0;
- for ( EndPoint oldEndPoint : oldEndPoints )
- {
- srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
- }
- }
- return rangesWithSourceTarget;
- }
-
- /**
- * This method sends messages out to nodes instructing them
- * to stream the specified ranges to specified target nodes.
- */
- protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
- {
- assignWork(rangesWithSourceTarget, null);
- }
-
- /**
- * This method sends messages out to nodes instructing them
- * to stream the specified ranges to specified target nodes.
- */
- protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget, List<EndPoint> filters) throws IOException
- {
- /*
- * Map whose key is the source node and the value is a map whose key is the
- * target and value is the list of ranges to be sent to it.
- */
- Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
- Set<Range> ranges = rangesWithSourceTarget.keySet();
-
- for ( Range range : ranges )
- {
- List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
- for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
- {
- Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
- if ( targetRangeMap == null )
- {
- targetRangeMap = new HashMap<EndPoint, List<Range>>();
- rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
- }
- List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);
- if ( rangesToGive == null )
- {
- rangesToGive = new ArrayList<Range>();
- targetRangeMap.put(rangeSourceTarget.target_, rangesToGive);
- }
- rangesToGive.add(range);
- }
- }
-
- Set<EndPoint> sources = rangeInfo.keySet();
- for ( EndPoint source : sources )
- {
- /* only send the message to the nodes that are in the filter. */
- if ( filters != null && filters.size() > 0 && !filters.contains(source) )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Filtering endpoint " + source + " as source ...");
- continue;
- }
-
- Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
- Set<EndPoint> targets = targetRangesMap.keySet();
- List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
-
- for ( EndPoint target : targets )
- {
- List<Range> rangeForTarget = targetRangesMap.get(target);
- BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
- bsmdList.add(bsMetadata);
- }
-
- BootstrapMetadataMessage bsMetadataMessage = new BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
- /* Send this message to the source to do his shit. */
- Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage);
- if (logger_.isDebugEnabled())
- logger_.debug("Sending the BootstrapMetadataMessage to " + source);
- MessagingService.getMessagingInstance().sendOneWay(message, source);
- }
- }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
+
+
+class LeaveJoinProtocolHelper
+{
+ private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolHelper.class);
+
+ /**
+ * Give a range a-------------b which is being split as
+ * a-----x-----y-----b then we want a mapping from
+ * (a, b] --> (a, x], (x, y], (y, b]
+ */
+ protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
+ {
+ Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
+ Token[] tokens = new Token[allTokens.length];
+ System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
+ Arrays.sort(tokens);
+
+ Range prevRange = null;
+ Token prevToken = null;
+ boolean bVal = false;
+
+ for ( Range oldRange : oldRanges )
+ {
+ if (bVal)
+ {
+ bVal = false;
+ List<Range> subRanges = splitRanges.get(prevRange);
+ if ( subRanges != null )
+ subRanges.add( new Range(prevToken, prevRange.right()) );
+ }
+
+ prevRange = oldRange;
+ prevToken = oldRange.left();
+ for (Token token : tokens)
+ {
+ List<Range> subRanges = splitRanges.get(oldRange);
+ if ( oldRange.contains(token) )
+ {
+ if ( subRanges == null )
+ {
+ subRanges = new ArrayList<Range>();
+ splitRanges.put(oldRange, subRanges);
+ }
+ subRanges.add( new Range(prevToken, token) );
+ prevToken = token;
+ bVal = true;
+ }
+ else
+ {
+ if ( bVal )
+ {
+ bVal = false;
+ subRanges.add( new Range(prevToken, oldRange.right()) );
+ }
+ }
+ }
+ }
+ /* This is to handle the last range being processed. */
+ if ( bVal )
+ {
+ bVal = false;
+ List<Range> subRanges = splitRanges.get(prevRange);
+ subRanges.add( new Range(prevToken, prevRange.right()) );
+ }
+ return splitRanges;
+ }
+
+ protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
+ {
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
+ /*
+ * Basically calculate for each range the endpoints handling the
+ * range in the old token set and in the new token set. Whoever
+ * gets bumped out of the top N will have to hand off that range
+ * to the new dude.
+ */
+ Set<Range> oldRangeSet = oldRangeToEndPointMap.keySet();
+ for(Range range : oldRangeSet)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
+ List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+ List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+ if ( newEndPoints != null )
+ {
+ List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
+ for ( EndPoint newEndPoint : newEndPoints2 )
+ {
+ if ( oldEndPoints.contains(newEndPoint) )
+ {
+ oldEndPoints.remove(newEndPoint);
+ newEndPoints.remove(newEndPoint);
+ }
+ }
+ }
+ else
+ {
+ logger_.warn("Trespassing - scram");
+ }
+ if (logger_.isDebugEnabled())
+ logger_.debug("Done figuring out the dudes who are bumped out for range " + range + " ...");
+ }
+ for ( Range range : oldRangeSet )
+ {
+ List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+ List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+ List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
+ if ( srcTarget == null )
+ {
+ srcTarget = new ArrayList<BootstrapSourceTarget>();
+ rangesWithSourceTarget.put(range, srcTarget);
+ }
+ int i = 0;
+ for ( EndPoint oldEndPoint : oldEndPoints )
+ {
+ srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
+ }
+ }
+ return rangesWithSourceTarget;
+ }
+
+ /**
+ * This method sends messages out to nodes instructing them
+ * to stream the specified ranges to specified target nodes.
+ */
+ protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
+ {
+ assignWork(rangesWithSourceTarget, null);
+ }
+
+ /**
+ * This method sends messages out to nodes instructing them
+ * to stream the specified ranges to specified target nodes.
+ */
+ protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget, List<EndPoint> filters) throws IOException
+ {
+ /*
+ * Map whose key is the source node and the value is a map whose key is the
+ * target and value is the list of ranges to be sent to it.
+ */
+ Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
+ Set<Range> ranges = rangesWithSourceTarget.keySet();
+
+ for ( Range range : ranges )
+ {
+ List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
+ for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
+ {
+ Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
+ if ( targetRangeMap == null )
+ {
+ targetRangeMap = new HashMap<EndPoint, List<Range>>();
+ rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
+ }
+ List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);
+ if ( rangesToGive == null )
+ {
+ rangesToGive = new ArrayList<Range>();
+ targetRangeMap.put(rangeSourceTarget.target_, rangesToGive);
+ }
+ rangesToGive.add(range);
+ }
+ }
+
+ Set<EndPoint> sources = rangeInfo.keySet();
+ for ( EndPoint source : sources )
+ {
+ /* only send the message to the nodes that are in the filter. */
+ if ( filters != null && filters.size() > 0 && !filters.contains(source) )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Filtering endpoint " + source + " as source ...");
+ continue;
+ }
+
+ Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
+ Set<EndPoint> targets = targetRangesMap.keySet();
+ List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
+
+ for ( EndPoint target : targets )
+ {
+ List<Range> rangeForTarget = targetRangesMap.get(target);
+ BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
+ bsmdList.add(bsMetadata);
+ }
+
+ BootstrapMetadataMessage bsMetadataMessage = new BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
+ /* Send this message to the source to do his shit. */
+ Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending the BootstrapMetadataMessage to " + source);
+ MessagingService.getMessagingInstance().sendOneWay(message, source);
+ }
+ }
+}