You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [16/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable;
+
+
+/**
+ * This class provides a filter for fitering out columns
+ * that are older than a specific time.
+ *
+ * @author pmalik
+ *
+ */
+class TimeFilter implements IFilter
+{
+ private long timeLimit_;
+ private boolean isDone_;
+
+ TimeFilter(long timeLimit)
+ {
+ timeLimit_ = timeLimit;
+ isDone_ = false;
+ }
+
+ public ColumnFamily filter(String cf, ColumnFamily columnFamily)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String cfName = columnFamily.name();
+ ColumnFamily filteredCf = new ColumnFamily(cfName);
+ if( values.length == 1 && !DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ int i =0;
+ for(IColumn column : columns)
+ {
+ if ( column.timestamp() >= timeLimit_ )
+ {
+ filteredCf.addColumn(column.name(), column);
+ ++i;
+ }
+ else
+ {
+ break;
+ }
+ }
+ if( i < columns.size() )
+ {
+ isDone_ = true;
+ }
+ }
+ else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super") )
+ {
+ /*
+ * TODO : For super columns we need to re-visit this issue.
+ * For now this fn will set done to true if we are done with
+ * atleast one super column
+ */
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ SuperColumn superColumn = (SuperColumn)column;
+ SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+ filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
+ Collection<IColumn> subColumns = superColumn.getSubColumns();
+ int i = 0;
+ for(IColumn subColumn : subColumns)
+ {
+ if ( subColumn.timestamp() >= timeLimit_ )
+ {
+ filteredSuperColumn.addColumn(subColumn.name(), subColumn);
+ ++i;
+ }
+ else
+ {
+ break;
+ }
+ }
+ if( i < filteredSuperColumn.getColumnCount() )
+ {
+ isDone_ = true;
+ }
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException();
+ }
+ return filteredCf;
+ }
+
+ public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+ {
+ long timeStamp = 0;
+ /*
+ * If its a column instance we need the timestamp to verify if
+ * it should be filtered , but at this instance the timestamp is not read
+ * so we read the timestamp and set the buffer back so that the rest of desrialization
+ * logic does not change.
+ */
+ if(column instanceof Column)
+ {
+ dis.mark(1000);
+ dis.readBoolean();
+ timeStamp = dis.readLong();
+ dis.reset();
+ if( timeStamp < timeLimit_ )
+ {
+ isDone_ = true;
+ return null;
+ }
+ }
+ return column;
+ }
+
+
+ public boolean isDone()
+ {
+ return isDone_;
+ }
+
+ public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+ {
+ return ssTable.next( key, cf, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,109 @@
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+public class TouchMessage
+{
+
+private static ICompactSerializer<TouchMessage> serializer_;
+
+ static
+ {
+ serializer_ = new TouchMessageSerializer();
+ }
+
+ static ICompactSerializer<TouchMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeTouchMessage(TouchMessage touchMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ TouchMessage.serializer().serialize(touchMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.touchVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ @XmlElement(name="Table")
+ private String table_;
+
+ @XmlElement(name="Key")
+ private String key_;
+
+ @XmlElement(name="fData")
+ private boolean fData_ = true;
+
+ private TouchMessage()
+ {
+ }
+
+ public TouchMessage(String table, String key)
+ {
+ table_ = table;
+ key_ = key;
+ }
+
+ public TouchMessage(String table, String key, boolean fData)
+ {
+ table_ = table;
+ key_ = key;
+ fData_ = fData;
+ }
+
+
+ String table()
+ {
+ return table_;
+ }
+
+ String key()
+ {
+ return key_;
+ }
+
+ public boolean isData()
+ {
+ return fData_;
+ }
+}
+
+class TouchMessageSerializer implements ICompactSerializer<TouchMessage>
+{
+ public void serialize(TouchMessage tm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(tm.table());
+ dos.writeUTF(tm.key());
+ dos.writeBoolean(tm.isData());
+ }
+
+ public TouchMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ boolean fData = dis.readBoolean();
+ TouchMessage tm = new TouchMessage( table, key, fData);
+ return tm;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/TouchVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,58 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class TouchVerbHandler implements IVerbHandler
+{
+ private static class ReadContext
+ {
+ protected DataInputBuffer bufIn_ = new DataInputBuffer();
+ }
+
+
+ private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<ReadContext> tls_ = new InheritableThreadLocal<ReadContext>();
+
+ public void doVerb(Message message)
+ {
+ byte[] body = (byte[])message.getMessageBody()[0];
+ /* Obtain a Read Context from TLS */
+ ReadContext readCtx = tls_.get();
+ if ( readCtx == null )
+ {
+ readCtx = new ReadContext();
+ tls_.set(readCtx);
+ }
+ readCtx.bufIn_.reset(body, body.length);
+
+ try
+ {
+ TouchMessage touchMessage = TouchMessage.serializer().deserialize(readCtx.bufIn_);
+ Table table = Table.open(touchMessage.table());
+ table.touch(touchMessage.key(), touchMessage.isData());
+ }
+ catch ( IOException ex)
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ }
+
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/TypeInfo.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/TypeInfo.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/TypeInfo.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/TypeInfo.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public enum TypeInfo
+{
+ BYTE,
+ CHAR,
+ SHORT,
+ INT,
+ LONG,
+ DOUBLE,
+ FLOAT,
+ STRING,
+ BLOB;
+
+ public static byte toByte(TypeInfo ti)
+ {
+ byte value = 0;
+ switch(ti)
+ {
+ case BYTE:
+ value = 1;
+ break;
+
+ case CHAR:
+ value = 2;
+ break;
+
+ case SHORT:
+ value = 3;
+ break;
+
+ case INT:
+ value = 4;
+ break;
+
+ case LONG:
+ value = 5;
+ break;
+
+ case DOUBLE:
+ value = 6;
+ break;
+
+ case FLOAT:
+ value = 7;
+ break;
+
+ case STRING:
+ value = 8;
+ break;
+
+ case BLOB:
+ value = 9;
+ break;
+ }
+
+ return value;
+ }
+
+ public static TypeInfo fromByte(byte b)
+ {
+ TypeInfo ti = null;
+ switch(b)
+ {
+ case 1:
+ ti = TypeInfo.BYTE;
+ break;
+
+ case 2:
+ ti = TypeInfo.CHAR;
+ break;
+
+ case 3:
+ ti = TypeInfo.SHORT;
+ break;
+
+ case 4:
+ ti = TypeInfo.INT;
+ break;
+
+ case 5:
+ ti = TypeInfo.LONG;
+ break;
+
+ case 6:
+ ti = TypeInfo.DOUBLE;
+ break;
+
+ case 7:
+ ti = TypeInfo.FLOAT;
+ break;
+
+ case 8:
+ ti = TypeInfo.STRING;
+ break;
+
+ case 9:
+ ti = TypeInfo.BLOB;
+ break;
+ }
+ return ti;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+/*
+ * This message is sent back the row mutation verb handler
+ * and basically specifes if the write succeeded or not for a particular
+ * key in a table
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class WriteResponseMessage implements Serializable
+{
+private static ICompactSerializer<WriteResponseMessage> serializer_;
+
+ static
+ {
+ serializer_ = new WriteResponseMessageSerializer();
+ }
+
+ static ICompactSerializer<WriteResponseMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeWriteResponseMessage(WriteResponseMessage writeResponseMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ WriteResponseMessage.serializer().serialize(writeResponseMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ @XmlElement(name = "Table")
+ private String table_;
+
+ @XmlElement(name = "key")
+ private String key_;
+
+ @XmlElement(name = "Status")
+ private boolean status_;
+
+ private WriteResponseMessage() {
+ }
+
+ public WriteResponseMessage(String table, String key, boolean bVal) {
+ table_ = table;
+ key_ = key;
+ status_ = bVal;
+ }
+
+ public String table()
+ {
+ return table_;
+ }
+
+ public String key()
+ {
+ return key_;
+ }
+
+ public boolean isSuccess()
+ {
+ return status_;
+ }
+}
+
+class WriteResponseMessageSerializer implements ICompactSerializer<WriteResponseMessage>
+{
+ public void serialize(WriteResponseMessage wm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(wm.table());
+ dos.writeUTF(wm.key());
+ dos.writeBoolean(wm.isSuccess());
+ }
+
+ public WriteResponseMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ boolean status = dis.readBoolean();
+ return new WriteResponseMessage(table, key, status);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,150 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class handles the boostrapping responsibilities for
+ * any new endpoint.
+*/
+public class BootStrapper implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(BootStrapper.class);
+ /* endpoints that need to be bootstrapped */
+ protected EndPoint[] targets_ = new EndPoint[0];
+ /* tokens of the nodes being bootstapped. */
+ protected BigInteger[] tokens_ = new BigInteger[0];
+ protected TokenMetadata tokenMetadata_ = null;
+ private List<EndPoint> filters_ = new ArrayList<EndPoint>();
+
+ public BootStrapper(EndPoint[] target, BigInteger[] token)
+ {
+ targets_ = target;
+ tokens_ = token;
+ tokenMetadata_ = StorageService.instance().getTokenMetadata();
+ }
+
+ public BootStrapper(EndPoint[] target, BigInteger[] token, EndPoint[] filters)
+ {
+ this(target, token);
+ Collections.addAll(filters_, filters);
+ }
+
+ public void run()
+ {
+ try
+ {
+ logger_.debug("Beginning bootstrap process for " + targets_ + " ...");
+ /* copy the token to endpoint map */
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ /* remove the tokens associated with the endpoints being bootstrapped */
+ for ( BigInteger token : tokens_ )
+ {
+ tokenToEndPointMap.remove(token);
+ }
+
+ Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+ Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+ logger_.debug("Total number of old ranges " + oldRanges.length);
+ /*
+ * Find the ranges that are split. Maintain a mapping between
+ * the range being split and the list of subranges.
+ */
+ Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);
+ /* Calculate the list of nodes that handle the old ranges */
+ Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
+ /* Mapping of split ranges to the list of endpoints responsible for the range */
+ Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
+ Set<Range> rangesSplit = splitRanges.keySet();
+ for ( Range splitRange : rangesSplit )
+ {
+ replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
+ }
+ /* Remove the ranges that are split. */
+ for ( Range splitRange : rangesSplit )
+ {
+ oldRangeToEndPointMap.remove(splitRange);
+ }
+
+ /* Add the subranges of the split range to the map with the same replica set. */
+ for ( Range splitRange : rangesSplit )
+ {
+ List<Range> subRanges = splitRanges.get(splitRange);
+ List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+ for ( Range subRange : subRanges )
+ {
+ /* Make sure we clone or else we are hammered. */
+ oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ }
+ }
+
+ /* Add the new token and re-calculate the range assignments */
+ Collections.addAll( oldTokens, tokens_ );
+ Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+
+ logger_.debug("Total number of new ranges " + newRanges.length);
+ /* Calculate the list of nodes that handle the new ranges */
+ Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
+ /* Calculate ranges that need to be sent and from whom to where */
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
+ /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
+ LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget, filters_);
+ }
+ catch ( Throwable th )
+ {
+ logger_.debug( LogUtil.throwableToString(th) );
+ }
+ }
+
+ private Range getMyOldRange()
+ {
+ Map<EndPoint, BigInteger> oldEndPointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+ Map<BigInteger, EndPoint> oldTokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+
+ oldEndPointToTokenMap.remove(targets_);
+ oldTokenToEndPointMap.remove(tokens_);
+
+ BigInteger myToken = oldEndPointToTokenMap.get(StorageService.getLocalStorageEndPoint());
+ List<BigInteger> allTokens = new ArrayList<BigInteger>(oldTokenToEndPointMap.keySet());
+ Collections.sort(allTokens);
+ int index = Collections.binarySearch(allTokens, myToken);
+ /* Calculate the lhs for the range */
+ BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
+ return new Range( lhs, myToken );
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.net.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BootstrapInitiateMessage implements Serializable
+{
+ private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
+
+ static
+ {
+ serializer_ = new BootstrapInitiateMessageSerializer();
+ }
+
+ public static ICompactSerializer<BootstrapInitiateMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
+ return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, new Object[]{bos.toByteArray()} );
+ }
+
+ protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
+
+ public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+ {
+ streamContexts_ = streamContexts;
+ }
+
+ public StreamContextManager.StreamContext[] getStreamContext()
+ {
+ return streamContexts_;
+ }
+}
+
+class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
+{
+ public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
+ {
+ dos.writeInt(bim.streamContexts_.length);
+ for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
+ {
+ StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
+ }
+ }
+
+ public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
+ if ( size > 0 )
+ {
+ streamContexts = new StreamContextManager.StreamContext[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
+ }
+ }
+ return new BootstrapInitiateMessage(streamContexts);
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadata.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadata.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadata.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,102 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This encapsulates information of the list of
+ * ranges that a target node requires in order to
+ * be bootstrapped. This will be bundled in a
+ * BootstrapMetadataMessage and sent to nodes that
+ * are going to handoff the data.
+*/
+class BootstrapMetadata
+{
+ private static ICompactSerializer<BootstrapMetadata> serializer_;
+ static
+ {
+ serializer_ = new BootstrapMetadataSerializer();
+ }
+
+ protected static ICompactSerializer<BootstrapMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ protected EndPoint target_;
+ protected List<Range> ranges_;
+
+ BootstrapMetadata(EndPoint target, List<Range> ranges)
+ {
+ target_ = target;
+ ranges_ = ranges;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append(target_);
+ sb.append("------->");
+ for ( Range range : ranges_ )
+ {
+ sb.append(range);
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+}
+
+class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
+{
+ public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
+ {
+ CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
+ int size = (bsMetadata.ranges_ == null) ? 0 : bsMetadata.ranges_.size();
+ dos.writeInt(size);
+
+ for ( Range range : bsMetadata.ranges_ )
+ {
+ Range.serializer().serialize(range, dos);
+ }
+ }
+
+ public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
+ {
+ EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
+ int size = dis.readInt();
+ List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+ for( int i = 0; i < size; ++i )
+ {
+ ranges.add(Range.serializer().deserialize(dis));
+ }
+ return new BootstrapMetadata( target, ranges );
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,90 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+
+/**
+ * This class encapsulates the message that needs to be sent
+ * to nodes that handoff data. The message contains information
+ * about the node to be bootstrapped and the ranges with which
+ * it needs to be bootstrapped.
+*/
+class BootstrapMetadataMessage
+{
+ private static ICompactSerializer<BootstrapMetadataMessage> serializer_;
+ static
+ {
+ serializer_ = new BootstrapMetadataMessageSerializer();
+ }
+
+ protected static ICompactSerializer<BootstrapMetadataMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+ return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, new Object[]{bos.toByteArray()} );
+ }
+
+ protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
+
+ BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
+ {
+ bsMetadata_ = bsMetadata;
+ }
+}
+
+class BootstrapMetadataMessageSerializer implements ICompactSerializer<BootstrapMetadataMessage>
+{
+ public void serialize(BootstrapMetadataMessage bsMetadataMessage, DataOutputStream dos) throws IOException
+ {
+ BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+ int size = (bsMetadata == null) ? 0 : bsMetadata.length;
+ dos.writeInt(size);
+ for ( BootstrapMetadata bsmd : bsMetadata )
+ {
+ BootstrapMetadata.serializer().serialize(bsmd, dos);
+ }
+ }
+
+ public BootstrapMetadataMessage deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ BootstrapMetadata[] bsMetadata = new BootstrapMetadata[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ bsMetadata[i] = BootstrapMetadata.serializer().deserialize(dis);
+ }
+ return new BootstrapMetadataMessage(bsMetadata);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,163 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StreamManager;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This verb handler handles the BootstrapMetadataMessage that is sent
+ * by the leader to the nodes that are responsible for handing off data.
+*/
+public class BootstrapMetadataVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
+ byte[] body = (byte[])message.getMessageBody()[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ try
+ {
+ BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
+ BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+
+ /*
+ * This is for debugging purposes. Remove later.
+ */
+ for ( BootstrapMetadata bsmd : bsMetadata )
+ {
+ logger_.debug(bsmd.toString());
+ }
+
+ for ( BootstrapMetadata bsmd : bsMetadata )
+ {
+ long startTime = System.currentTimeMillis();
+ doTransfer(bsmd.target_, bsmd.ranges_);
+ logger_.debug("Time taken to boostrap " +
+ bsmd.target_ +
+ " is " +
+ (System.currentTimeMillis() - startTime) +
+ " msecs.");
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+
+ /*
+ * This method needs to figure out the files on disk
+ * locally for each range and then stream them using
+ * the Bootstrap protocol to the target endpoint.
+ */
+ private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
+ {
+ if ( ranges.size() == 0 )
+ {
+ logger_.debug("No ranges to give scram ...");
+ return;
+ }
+
+ /* Just for debugging process - remove later */
+ for ( Range range : ranges )
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append(range.toString());
+ sb.append(" ");
+ logger_.debug("Beginning transfer process to " + target + " for ranges " + sb.toString());
+ }
+
+ /*
+ * (1) First we dump all the memtables to disk.
+ * (2) Run a version of compaction which will basically
+ * put the keys in the range specified into a directory
+ * named as per the endpoint it is destined for inside the
+ * bootstrap directory.
+ * (3) Handoff the data.
+ */
+ List<String> tables = DatabaseDescriptor.getTables();
+ for ( String tName : tables )
+ {
+ Table table = Table.open(tName);
+ logger_.debug("Flushing memtables ...");
+ table.flush(false);
+ logger_.debug("Forcing compaction ...");
+ /* Get the counting bloom filter for each endpoint and the list of files that need to be streamed */
+ List<String> fileList = new ArrayList<String>();
+ boolean bVal = table.forceCompaction(ranges, target, fileList);
+ doHandoff(target, fileList);
+ }
+ }
+
+ /**
+ * Stream the files in the bootstrap directory over to the
+ * node being bootstrapped.
+ */
+ private void doHandoff(EndPoint target, List<String> fileList) throws IOException
+ {
+ List<File> filesList = new ArrayList<File>();
+ for(String file : fileList)
+ {
+ filesList.add(new File(file));
+ }
+ File[] files = filesList.toArray(new File[0]);
+ StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[files.length];
+ int i = 0;
+ for ( File file : files )
+ {
+ streamContexts[i] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length());
+ logger_.debug("Stream context metadata " + streamContexts[i]);
+ ++i;
+ }
+
+ if ( files.length > 0 )
+ {
+ /* Set up the stream manager with the files that need to streamed */
+ StreamManager.instance(target).addFilesToStream(streamContexts);
+ /* Send the bootstrap initiate message */
+ BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
+ Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+ logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
+ MessagingService.getMessagingInstance().sendOneWay(message, target);
+ logger_.debug("Waiting for transfer to " + target + " to complete");
+ StreamManager.instance(target).waitForStreamCompletion();
+ logger_.debug("Done with transfer to " + target);
+ }
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapSourceTarget.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapSourceTarget.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapSourceTarget.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootstrapSourceTarget.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,49 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * This class encapsulates who is the source and the
+ * target of a bootstrap for a particular range.
+ */
+class BootstrapSourceTarget
+{
+ protected EndPoint source_;
+ protected EndPoint target_;
+
+ BootstrapSourceTarget(EndPoint source, EndPoint target)
+ {
+ source_ = source;
+ target_ = target;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append("SOURCE: ");
+ sb.append(source_);
+ sb.append(" ----> ");
+ sb.append("TARGET: ");
+ sb.append(target_);
+ sb.append(" ");
+ return sb.toString();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,225 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+
+
+class LeaveJoinProtocolHelper
+{
+ private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolHelper.class);
+
+ /**
+ * Give a range a-------------b which is being split as
+ * a-----x-----y-----b then we want a mapping from
+ * (a, b] --> (a, x], (x, y], (y, b]
+ */
+ protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, BigInteger[] allTokens)
+ {
+ Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
+ BigInteger[] tokens = new BigInteger[allTokens.length];
+ System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
+ Arrays.sort(tokens);
+
+ Range prevRange = null;
+ BigInteger prevToken = null;
+ boolean bVal = false;
+
+ for ( Range oldRange : oldRanges )
+ {
+ if ( bVal && prevRange != null )
+ {
+ bVal = false;
+ List<Range> subRanges = splitRanges.get(prevRange);
+ if ( subRanges != null )
+ subRanges.add( new Range(prevToken, prevRange.right()) );
+ }
+
+ prevRange = oldRange;
+ prevToken = oldRange.left();
+ for ( BigInteger token : tokens )
+ {
+ List<Range> subRanges = splitRanges.get(oldRange);
+ if ( oldRange.contains(token) )
+ {
+ if ( subRanges == null )
+ {
+ subRanges = new ArrayList<Range>();
+ splitRanges.put(oldRange, subRanges);
+ }
+ subRanges.add( new Range(prevToken, token) );
+ prevToken = token;
+ bVal = true;
+ }
+ else
+ {
+ if ( bVal )
+ {
+ bVal = false;
+ subRanges.add( new Range(prevToken, oldRange.right()) );
+ }
+ }
+ }
+ }
+ /* This is to handle the last range being processed. */
+ if ( bVal )
+ {
+ bVal = false;
+ List<Range> subRanges = splitRanges.get(prevRange);
+ subRanges.add( new Range(prevToken, prevRange.right()) );
+ }
+ return splitRanges;
+ }
+
+ protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
+ {
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
+ /*
+ * Basically calculate for each range the endpoints handling the
+ * range in the old token set and in the new token set. Whoever
+ * gets bumped out of the top N will have to hand off that range
+ * to the new dude.
+ */
+ Set<Range> oldRangeSet = oldRangeToEndPointMap.keySet();
+ for(Range range : oldRangeSet)
+ {
+ logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
+ List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+ List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+ if ( newEndPoints != null )
+ {
+ List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
+ for ( EndPoint newEndPoint : newEndPoints2 )
+ {
+ if ( oldEndPoints.contains(newEndPoint) )
+ {
+ oldEndPoints.remove(newEndPoint);
+ newEndPoints.remove(newEndPoint);
+ }
+ }
+ }
+ else
+ {
+ logger_.warn("Trespassing - scram");
+ }
+ logger_.debug("Done figuring out the dudes who are bumped out for range " + range + " ...");
+ }
+ for ( Range range : oldRangeSet )
+ {
+ List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+ List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+ List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
+ if ( srcTarget == null )
+ {
+ srcTarget = new ArrayList<BootstrapSourceTarget>();
+ rangesWithSourceTarget.put(range, srcTarget);
+ }
+ int i = 0;
+ for ( EndPoint oldEndPoint : oldEndPoints )
+ {
+ srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
+ }
+ }
+ return rangesWithSourceTarget;
+ }
+
+ /**
+ * This method sends messages out to nodes instructing them
+ * to stream the specified ranges to specified target nodes.
+ */
+ protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
+ {
+ assignWork(rangesWithSourceTarget, null);
+ }
+
+ /**
+ * This method sends messages out to nodes instructing them
+ * to stream the specified ranges to specified target nodes.
+ */
+ protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget, List<EndPoint> filters) throws IOException
+ {
+ /*
+ * Map whose key is the source node and the value is a map whose key is the
+ * target and value is the list of ranges to be sent to it.
+ */
+ Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
+ Set<Range> ranges = rangesWithSourceTarget.keySet();
+
+ for ( Range range : ranges )
+ {
+ List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
+ for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
+ {
+ Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
+ if ( targetRangeMap == null )
+ {
+ targetRangeMap = new HashMap<EndPoint, List<Range>>();
+ rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
+ }
+ List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);
+ if ( rangesToGive == null )
+ {
+ rangesToGive = new ArrayList<Range>();
+ targetRangeMap.put(rangeSourceTarget.target_, rangesToGive);
+ }
+ rangesToGive.add(range);
+ }
+ }
+
+ Set<EndPoint> sources = rangeInfo.keySet();
+ for ( EndPoint source : sources )
+ {
+ /* only send the message to the nodes that are in the filter. */
+ if ( filters != null && filters.size() > 0 && !filters.contains(source) )
+ {
+ logger_.debug("Filtering endpoint " + source + " as source ...");
+ continue;
+ }
+
+ Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
+ Set<EndPoint> targets = targetRangesMap.keySet();
+ List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
+
+ for ( EndPoint target : targets )
+ {
+ List<Range> rangeForTarget = targetRangesMap.get(target);
+ BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
+ bsmdList.add(bsMetadata);
+ }
+
+ BootstrapMetadataMessage bsMetadataMessage = new BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
+ /* Send this message to the source to do his shit. */
+ Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage);
+ logger_.debug("Sending the BootstrapMetadataMessage to " + source);
+ MessagingService.getMessagingInstance().sendOneWay(message, source);
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,291 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class performs the exact opposite of the
+ * operations of the Bootstrapper class. Given
+ * a bunch of nodes that need to move it determines
+ * who they need to hand off data in terms of ranges.
+*/
+public class LeaveJoinProtocolImpl implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);
+
+ /* endpoints that are to be moved. */
+ protected EndPoint[] targets_ = new EndPoint[0];
+ /* position where they need to be moved */
+ protected BigInteger[] tokens_ = new BigInteger[0];
+ /* token metadata information */
+ protected TokenMetadata tokenMetadata_ = null;
+
+ public LeaveJoinProtocolImpl(EndPoint[] targets, BigInteger[] tokens)
+ {
+ targets_ = targets;
+ tokens_ = tokens;
+ tokenMetadata_ = StorageService.instance().getTokenMetadata();
+ }
+
+ public void run()
+ {
+ try
+ {
+ logger_.debug("Beginning leave/join process for ...");
+ /* copy the token to endpoint map */
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ /* copy the endpoint to token map */
+ Map<EndPoint, BigInteger> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+
+ Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+ Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+ logger_.debug("Total number of old ranges " + oldRanges.length);
+ /* Calculate the list of nodes that handle the old ranges */
+ Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
+
+ /* Remove the tokens of the nodes leaving the ring */
+ Set<BigInteger> tokens = getTokensForLeavingNodes();
+ oldTokens.removeAll(tokens);
+ Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
+ /* Get expanded range to initial range mapping */
+ Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
+ /* add the new token positions to the old tokens set */
+ for ( BigInteger token : tokens_ )
+ oldTokens.add(token);
+ Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
+ /* replace the ranges that were split with the split ranges in the old configuration */
+ addSplitRangesToOldConfiguration(oldRangeToEndPointMap, rangesAfterNodesJoin);
+
+ /* Re-calculate the new ranges after the new token positions are added */
+ Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+ /* Remove the old locations from tokenToEndPointMap and add the new locations they are moving to */
+ for ( int i = 0; i < targets_.length; ++i )
+ {
+ tokenToEndPointMap.remove( endpointToTokenMap.get(targets_[i]) );
+ tokenToEndPointMap.put(tokens_[i], targets_[i]);
+ }
+ /* Calculate the list of nodes that handle the new ranges */
+ Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
+ /* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
+ removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
+ /* Calculate ranges that need to be sent and from whom to where */
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
+ /* For debug purposes only */
+ Set<Range> ranges = rangesWithSourceTarget.keySet();
+ for ( Range range : ranges )
+ {
+ System.out.print("RANGE: " + range + ":: ");
+ List<BootstrapSourceTarget> infos = rangesWithSourceTarget.get(range);
+ for ( BootstrapSourceTarget info : infos )
+ {
+ System.out.print(info);
+ System.out.print(" ");
+ }
+ System.out.println(System.getProperty("line.separator"));
+ }
+ /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
+ LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
+ }
+ catch ( Throwable th )
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ /**
+ * This method figures out the ranges that have been split and
+ * replaces them with the split range.
+ * @param oldRangeToEndPointMap old range mapped to their replicas.
+ * @param rangesAfterNodesJoin ranges after the nodes have joined at
+ * their respective position.
+ */
+ private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
+ {
+ /*
+ * Find the ranges that are split. Maintain a mapping between
+ * the range being split and the list of subranges.
+ */
+ Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
+ /* Mapping of split ranges to the list of endpoints responsible for the range */
+ Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
+ Set<Range> rangesSplit = splitRanges.keySet();
+ for ( Range splitRange : rangesSplit )
+ {
+ replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
+ }
+ /* Remove the ranges that are split. */
+ for ( Range splitRange : rangesSplit )
+ {
+ oldRangeToEndPointMap.remove(splitRange);
+ }
+
+ /* Add the subranges of the split range to the map with the same replica set. */
+ for ( Range splitRange : rangesSplit )
+ {
+ List<Range> subRanges = splitRanges.get(splitRange);
+ List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+ for ( Range subRange : subRanges )
+ {
+ /* Make sure we clone or else we are hammered. */
+ oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ }
+ }
+ }
+
+ /**
+ * Reset the newRangeToEndPointMap and replace the expanded range
+ * with the ranges whose aggregate is the expanded range. This happens
+ * only when nodes leave the ring to migrate to a different position.
+ *
+ * @param newRangeToEndPointMap all new ranges mapped to the replicas
+ * responsible for those ranges.
+ * @param expandedRangeToOldRangeMap mapping between the expanded ranges
+ * and the ranges whose aggregate is the expanded range.
+ */
+ private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
+ {
+ /* Get the replicas for the expanded ranges */
+ Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
+ Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
+ for ( Range expandedRange : expandedRanges )
+ {
+ replicasForExpandedRanges.put( expandedRange, newRangeToEndPointMap.get(expandedRange) );
+ newRangeToEndPointMap.remove(expandedRange);
+ }
+ /* replace the expanded ranges in the newRangeToEndPointMap with the subRanges */
+ for ( Range expandedRange : expandedRanges )
+ {
+ List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
+ List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);
+ for ( Range subRange : subRanges )
+ {
+ newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ }
+ }
+ }
+
+ private Set<BigInteger> getTokensForLeavingNodes()
+ {
+ Set<BigInteger> tokens = new HashSet<BigInteger>();
+ for ( EndPoint target : targets_ )
+ {
+ tokens.add( tokenMetadata_.getToken(target) );
+ }
+ return tokens;
+ }
+
+ /**
+ * Here we are removing the nodes that need to leave the
+ * ring and trying to calculate what the ranges would look
+ * like w/o them. For eg if we remove two nodes A and D from
+ * the ring and the order of nodes on the ring is A, B, C
+ * and D. When B is removed the range of C is the old range
+ * of C and the old range of B. We want a mapping from old
+ * range of B to new range of B. We have
+ * A----B----C----D----E----F----G and we remove b and e
+ * then we want a mapping from (a, c] --> (a,b], (b, c] and
+ * (d, f] --> (d, e], (d,f].
+ * @param oldRanges ranges with the previous configuration
+ * @param newRanges ranges with the target endpoints removed.
+ * @return map of expanded range to the list whose aggregate is
+ * the expanded range.
+ */
+ protected static Map<Range, List<Range>> getExpandedRangeToOldRangeMapping(Range[] oldRanges, Range[] newRanges)
+ {
+ Map<Range, List<Range>> map = new HashMap<Range, List<Range>>();
+ List<Range> oRanges = new ArrayList<Range>();
+ Collections.addAll(oRanges, oldRanges);
+ List<Range> nRanges = new ArrayList<Range>();
+ Collections.addAll(nRanges, newRanges);
+
+ /*
+ * Remove the ranges that are the same.
+ * Now we will be left with the expanded
+ * ranges in the nRanges list and the
+ * smaller ranges in the oRanges list.
+ */
+ for( Range oRange : oldRanges )
+ {
+ boolean bVal = nRanges.remove(oRange);
+ if ( bVal )
+ oRanges.remove(oRange);
+ }
+
+ int nSize = nRanges.size();
+ int oSize = oRanges.size();
+ /*
+ * Establish the mapping between expanded ranges
+ * to the smaller ranges whose aggregate is the
+ * expanded range.
+ */
+ for ( int i = 0; i < nSize; ++i )
+ {
+ Range nRange = nRanges.get(i);
+ for ( int j = 0; j < oSize; ++j )
+ {
+ Range oRange = oRanges.get(j);
+ if ( nRange.contains(oRange.right()) )
+ {
+ List<Range> smallerRanges = map.get(nRange);
+ if ( smallerRanges == null )
+ {
+ smallerRanges = new ArrayList<Range>();
+ map.put(nRange, smallerRanges);
+ }
+ smallerRanges.add(oRange);
+ continue;
+ }
+ }
+ }
+
+ return map;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ StorageService ss = StorageService.instance();
+ ss.updateTokenMetadata(BigInteger.valueOf(3), new EndPoint("A", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(6), new EndPoint("B", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(9), new EndPoint("C", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(12), new EndPoint("D", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(15), new EndPoint("E", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(18), new EndPoint("F", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(21), new EndPoint("G", 7000));
+ ss.updateTokenMetadata(BigInteger.valueOf(24), new EndPoint("H", 7000));
+
+ Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new BigInteger[]{BigInteger.valueOf(22), BigInteger.valueOf(23)} );
+ runnable.run();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.gms.GossipDigest;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * A representation of the range that a node is responsible for on the DHT ring.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Range implements Comparable<Range>
+{
+ private static ICompactSerializer<Range> serializer_;
+ static
+ {
+ serializer_ = new RangeSerializer();
+ }
+
+ public static ICompactSerializer<Range> serializer()
+ {
+ return serializer_;
+ }
+
+ public static boolean isKeyInRanges(List<Range> ranges, String key)
+ {
+ if(ranges == null )
+ return false;
+
+ for ( Range range : ranges)
+ {
+ if(range.contains(StorageService.hash(key)))
+ {
+ return true ;
+ }
+ }
+ return false;
+ }
+
+
+ private BigInteger left_;
+ private BigInteger right_;
+
+ public Range(BigInteger left, BigInteger right)
+ {
+ left_ = left;
+ right_ = right;
+ }
+
+ /**
+ * Returns the left endpoint of a range.
+ * @return left endpoint
+ */
+ public BigInteger left()
+ {
+ return left_;
+ }
+
+ /**
+ * Returns the right endpoint of a range.
+ * @return right endpoint
+ */
+ public BigInteger right()
+ {
+ return right_;
+ }
+
+ boolean isSplitRequired()
+ {
+ return ( left_.subtract(right_).signum() >= 0 );
+ }
+
+ public boolean isSplitBy(BigInteger bi)
+ {
+ if ( left_.subtract(right_).signum() > 0 )
+ {
+ /*
+ * left is greater than right we are wrapping around.
+ * So if the interval is [a,b) where a > b then we have
+ * 3 cases one of which holds for any given token k.
+ * (1) k > a -- return true
+ * (2) k < b -- return true
+ * (3) b < k < a -- return false
+ */
+ if ( bi.subtract(left_).signum() > 0 )
+ return true;
+ else if (right_.subtract(bi).signum() > 0 )
+ return true;
+ else
+ return false;
+ }
+ else if ( left_.subtract(right_).signum() < 0 )
+ {
+ /*
+ * This is the range [a, b) where a < b.
+ */
+ return ( bi.subtract(left_).signum() > 0 && right_.subtract(bi).signum() > 0 );
+ }
+ else
+ {
+ // should never be here.
+ return true;
+ }
+ }
+
+ /**
+ * Helps determine if a given point on the DHT ring is contained
+ * in the range in question.
+ * @param bi point in question
+ * @return true if the point contains within the range else false.
+ */
+ public boolean contains(BigInteger bi)
+ {
+ if ( left_.subtract(right_).signum() > 0 )
+ {
+ /*
+ * left is greater than right we are wrapping around.
+ * So if the interval is [a,b) where a > b then we have
+ * 3 cases one of which holds for any given token k.
+ * (1) k > a -- return true
+ * (2) k < b -- return true
+ * (3) b < k < a -- return false
+ */
+ if ( bi.subtract(left_).signum() >= 0 )
+ return true;
+ else if (right_.subtract(bi).signum() > 0 )
+ return true;
+ else
+ return false;
+ }
+ else if ( left_.subtract(right_).signum() < 0 )
+ {
+ /*
+ * This is the range [a, b) where a < b.
+ */
+ return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Helps determine if a given range on the DHT ring is contained
+ * within the range associated with the <i>this</i> pointer.
+ * @param rhs rhs in question
+ * @return true if the point contains within the range else false.
+ */
+ public boolean contains(Range rhs)
+ {
+ /*
+ * If (a, b] and (c, d} are not wrap arounds
+ * then return true if a <= c <= d <= b.
+ */
+ if ( !isWrapAround(this) && !isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(rhs.right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /*
+ * If lhs is a wrap around and rhs is not then
+ * rhs.left >= lhs.left and rhs.right >= lhs.left.
+ */
+ if ( isWrapAround(this) && !isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && rhs.right_.subtract(right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /*
+ * If lhs is not a wrap around and rhs is a wrap
+ * around then we just return false.
+ */
+ if ( !isWrapAround(this) && isWrapAround(rhs) )
+ return false;
+
+ if( isWrapAround(this) && isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /* should never be here */
+ return false;
+ }
+
+ /**
+ * Tells if the given range is a wrap around.
+ * @param range
+ * @return
+ */
+ private boolean isWrapAround(Range range)
+ {
+ boolean bVal = ( range.left_.subtract(range.right_).signum() > 0 ) ? true : false;
+ return bVal;
+ }
+
+ public int compareTo(Range rhs)
+ {
+ /*
+ * If the range represented by the "this" pointer
+ * is a wrap around then it is the smaller one.
+ */
+ if ( isWrapAround(this) )
+ return -1;
+
+ if ( isWrapAround(rhs) )
+ return 1;
+
+ return right_.compareTo(rhs.right_);
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof Range) )
+ return false;
+ Range rhs = (Range)o;
+ if ( left_.equals(rhs.left_) && right_.equals(rhs.right_) )
+ return true;
+ else
+ return false;
+ }
+
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return "(" + left_ + "," + right_ + "]";
+ }
+}
+
+class RangeSerializer implements ICompactSerializer<Range>
+{
+ public void serialize(Range range, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(range.left().toString());
+ dos.writeUTF(range.right().toString());
+ }
+
+ public Range deserialize(DataInputStream dis) throws IOException
+ {
+ BigInteger left = new BigInteger(dis.readUTF());
+ BigInteger right = new BigInteger(dis.readUTF());
+ return new Range(left, right);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/ApplicationState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/ApplicationState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/ApplicationState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+
+
+/**
+ * This abstraction represents the state associated with a particular node which an
+ * application wants to make available to the rest of the nodes in the cluster.
+ * Whenever a peice of state needs to be disseminated to the rest of cluster wrap
+ * the state in an instance of <i>ApplicationState</i> and add it to the Gossiper.
+ *
+ * For eg. if we want to disseminate load information for node A do the following:
+ *
+ * ApplicationState loadState = new ApplicationState(<string reprensentation of load>);
+ * Gossiper.instance().addApplicationState("LOAD STATE", loadState);
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ApplicationState
+{
+ private static ICompactSerializer<ApplicationState> serializer_;
+ static
+ {
+ serializer_ = new ApplicationStateSerializer();
+ }
+
+ int version_;
+ String state_;
+
+
+ ApplicationState(String state, int version)
+ {
+ state_ = state;
+ version_ = version;
+ }
+
+ public static ICompactSerializer<ApplicationState> serializer()
+ {
+ return serializer_;
+ }
+
+ /**
+ * Wraps the specified state into a ApplicationState instance.
+ * @param state string representation of arbitrary state.
+ */
+ public ApplicationState(String state)
+ {
+ state_ = state;
+ version_ = VersionGenerator.getNextVersion();
+ }
+
+ public String getState()
+ {
+ return state_;
+ }
+
+ int getStateVersion()
+ {
+ return version_;
+ }
+}
+
+class ApplicationStateSerializer implements ICompactSerializer<ApplicationState>
+{
+ public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(appState.state_);
+ dos.writeInt(appState.version_);
+ }
+
+ public ApplicationState deserialize(DataInputStream dis) throws IOException
+ {
+ String state = dis.readUTF();
+ int version = dis.readInt();
+ return new ApplicationState(state, version);
+ }
+}
+