You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC
svn commit: r749207 [2/12] - in
/incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/
net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/
Added: incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,205 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+
+/*
+ * This class returns the nodes responsible for a given
+ * key but does respects rack awareness. It makes a best
+ * effort to get a node from a different data center and
+ * a node in a different rack in the same datacenter as
+ * the primary.
+ */
+public class RackAwareStrategy extends AbstractStrategy
+{
+ public RackAwareStrategy(TokenMetadata tokenMetadata)
+ {
+ super(tokenMetadata);
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token)
+ {
+ int startIndex = 0 ;
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ boolean bDataCenter = false;
+ boolean bOtherRack = false;
+ int foundCount = 0;
+ int N = DatabaseDescriptor.getReplicationFactor();
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ if( N == 1 )
+ {
+ return list.toArray(new EndPoint[0]);
+ }
+ startIndex = (index + 1)%totalNodes;
+ IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ try
+ {
+ // First try to find one in a different data center
+ if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff datacenter no need to find another
+ if( !bDataCenter )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bDataCenter = true;
+ foundCount++;
+ }
+ continue;
+ }
+ // Now try to find one on a different rack
+ if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+ endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff rack no need to find another
+ if( !bOtherRack )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bOtherRack = true;
+ foundCount++;
+ }
+ continue;
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+
+ }
+ // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ return list.toArray(new EndPoint[0]);
+ }
+
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+ {
+ Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int startIndex = 0 ;
+ int foundCount = 0;
+ boolean bDataCenter = false;
+ boolean bOtherRack = false;
+
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+
+ for ( String key : keys )
+ {
+ BigInteger token = StorageService.hash(key);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ if( N == 1 )
+ {
+ results.put( key, list.toArray(new EndPoint[0]) );
+ return results;
+ }
+ startIndex = (index + 1)%totalNodes;
+ IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ try
+ {
+ // First try to find one in a different data center
+ if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff datacenter no need to find another
+ if( !bDataCenter )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bDataCenter = true;
+ foundCount++;
+ }
+ continue;
+ }
+ // Now try to find one on a different rack
+ if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+ endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff rack no need to find another
+ if( !bOtherRack )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bOtherRack = true;
+ foundCount++;
+ }
+ continue;
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+
+ }
+ // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ results.put(key, list.toArray(new EndPoint[0]));
+ }
+
+ return results;
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ {
+ throw new UnsupportedOperationException("This operation is not currently supported");
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,154 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * This class returns the nodes responsible for a given
+ * key but does not respect rack awareness. Basically
+ * returns the 3 nodes that lie right next to each other
+ * on the ring.
+ */
+public class RackUnawareStrategy extends AbstractStrategy
+{
+ /* Use this flag to check if initialization is in order. */
+ private AtomicBoolean initialized_ = new AtomicBoolean(false);
+ private Map<Range, List<EndPoint>> rangeToEndPointMap_;
+
+ public RackUnawareStrategy(TokenMetadata tokenMetadata)
+ {
+ super(tokenMetadata);
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token)
+ {
+ return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ {
+ int startIndex = 0 ;
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int foundCount = 0;
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ startIndex = (index + 1)%totalNodes;
+ // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ return list.toArray(new EndPoint[0]);
+ }
+
+ private void doInitialization()
+ {
+ if ( !initialized_.get() )
+ {
+ /* construct the mapping from the ranges to the replicas responsible for them */
+ rangeToEndPointMap_ = StorageService.instance().getRangeToEndPointMap();
+ initialized_.set(true);
+ }
+ }
+
+ /**
+ * This method determines which range in the array actually contains
+ * the hash of the key
+ * @param ranges
+ * @param key
+ * @return
+ */
+ private int findRangeIndexForKey(Range[] ranges, String key)
+ {
+ int index = 0;
+ BigInteger hash = StorageService.hash(key);
+ for ( int i = 0; i < ranges.length; ++i )
+ {
+ if ( ranges[i].contains(hash) )
+ {
+ index = i;
+ break;
+ }
+ }
+
+ return index;
+ }
+
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+ {
+ Arrays.sort(keys);
+ Range[] ranges = StorageService.instance().getAllRanges();
+
+ Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int startIndex = 0 ;
+ int foundCount = 0;
+
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ for ( String key : keys )
+ {
+ BigInteger token = StorageService.hash(key);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ startIndex = (index + 1)%totalNodes;
+ // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ results.put(key, list.toArray(new EndPoint[0]));
+ }
+
+ return results;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TokenMetadata
+{
+ private static ICompactSerializer<TokenMetadata> serializer_ = new TokenMetadataSerializer();
+
+ public static ICompactSerializer<TokenMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ /* Maintains token to endpoint map of every node in the cluster. */
+ private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();
+ /* Maintains a reverse index of endpoint to token in the cluster. */
+ private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
+
+ /* Use this lock for manipulating the token map */
+ private ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+
+ /*
+ * For JAXB purposes.
+ */
+ public TokenMetadata()
+ {
+ }
+
+ protected TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
+ {
+ tokenToEndPointMap_ = tokenToEndPointMap;
+ endPointToTokenMap_ = endPointToTokenMap;
+ }
+
+ public TokenMetadata cloneMe()
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
+ Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
+ return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+ }
+
+ /**
+ * Update the two maps in an safe mode.
+ */
+ public void update(BigInteger token, EndPoint endpoint)
+ {
+ lock_.writeLock().lock();
+ try
+ {
+ BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+ if ( oldToken != null )
+ tokenToEndPointMap_.remove(oldToken);
+ tokenToEndPointMap_.put(token, endpoint);
+ endPointToTokenMap_.put(endpoint, token);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Remove the entries in the two maps.
+ * @param endpoint
+ */
+ public void remove(EndPoint endpoint)
+ {
+ lock_.writeLock().lock();
+ try
+ {
+ BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+ if ( oldToken != null )
+ tokenToEndPointMap_.remove(oldToken);
+ endPointToTokenMap_.remove(endpoint);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ public BigInteger getToken(EndPoint endpoint)
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return endPointToTokenMap_.get(endpoint);
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ public boolean isKnownEndPoint(EndPoint ep)
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return endPointToTokenMap_.containsKey(ep);
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ /*
+ * Returns a safe clone of tokenToEndPointMap_.
+ */
+ public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ /*
+ * Returns a safe clone of endPointTokenMap_.
+ */
+ public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ Set<EndPoint> eps = endPointToTokenMap_.keySet();
+
+ for ( EndPoint ep : eps )
+ {
+ sb.append(ep);
+ sb.append(":");
+ sb.append(endPointToTokenMap_.get(ep));
+ sb.append(System.getProperty("line.separator"));
+ }
+
+ return sb.toString();
+ }
+}
+
+class TokenMetadataSerializer implements ICompactSerializer<TokenMetadata>
+{
+ public void serialize(TokenMetadata tkMetadata, DataOutputStream dos) throws IOException
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tkMetadata.cloneTokenEndPointMap();
+ Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+ /* write the size */
+ dos.writeInt(tokens.size());
+ for ( BigInteger token : tokens )
+ {
+ byte[] bytes = token.toByteArray();
+ /* Convert the BigInteger to byte[] and persist */
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ /* Write the endpoint out */
+ CompactEndPointSerializationHelper.serialize(tokenToEndPointMap.get(token), dos);
+ }
+ }
+
+ public TokenMetadata deserialize(DataInputStream dis) throws IOException
+ {
+ TokenMetadata tkMetadata = null;
+ int size = dis.readInt();
+
+ if ( size > 0 )
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = new HashMap<BigInteger, EndPoint>();
+ Map<EndPoint, BigInteger> endPointToTokenMap = new HashMap<EndPoint, BigInteger>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ /* Read the byte[] and convert to BigInteger */
+ byte[] bytes = new byte[dis.readInt()];
+ dis.readFully(bytes);
+ BigInteger token = new BigInteger(bytes);
+ /* Read the endpoint out */
+ EndPoint endpoint = CompactEndPointSerializationHelper.deserialize(dis);
+ tokenToEndPointMap.put(token, endpoint);
+ endPointToTokenMap.put(endpoint, token);
+ }
+
+ tkMetadata = new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+ }
+
+ return tkMetadata;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.Hashtable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class AsyncResult implements IAsyncResult
+{
+ private static Logger logger_ = Logger.getLogger( AsyncResult.class );
+ private Object[] result_ = new Object[0];
+ private AtomicBoolean done_ = new AtomicBoolean(false);
+ private Lock lock_ = new ReentrantLock();
+ private Condition condition_;
+
+ public AsyncResult()
+ {
+ condition_ = lock_.newCondition();
+ }
+
+ public Object[] get()
+ {
+ lock_.lock();
+ try
+ {
+ if ( !done_.get() )
+ {
+ condition_.await();
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return result_;
+ }
+
+ public boolean isDone()
+ {
+ return done_.get();
+ }
+
+ public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+ {
+ lock_.lock();
+ try
+ {
+ boolean bVal = true;
+ try
+ {
+ if ( !done_.get() )
+ {
+ bVal = condition_.await(timeout, tu);
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+
+ if ( !bVal && !done_.get() )
+ {
+ throw new TimeoutException("Operation timed out.");
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return result_;
+ }
+
+ void result(Object[] result)
+ {
+ try
+ {
+ lock_.lock();
+ if ( !done_.get() )
+ {
+ result_ = result;
+ done_.set(true);
+ condition_.signal();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CompactEndPointSerializationHelper
+{
+ public static void serialize(EndPoint endPoint, DataOutputStream dos) throws IOException
+ {
+ dos.write(EndPoint.toBytes(endPoint));
+ }
+
+ public static EndPoint deserialize(DataInputStream dis) throws IOException
+ {
+ byte[] bytes = new byte[6];
+ dis.readFully(bytes, 0, bytes.length);
+ return EndPoint.fromBytes(bytes);
+ }
+
+ private static byte[] getIPAddress(String host) throws UnknownHostException
+ {
+ InetAddress ia = InetAddress.getByName(host);
+ return ia.getAddress();
+ }
+
+ private static String getHostName(byte[] ipAddr) throws UnknownHostException
+ {
+ InetAddress ia = InetAddress.getByAddress(ipAddr);
+ return ia.getCanonicalHostName();
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ EndPoint ep = new EndPoint(7000);
+ byte[] bytes = EndPoint.toBytes(ep);
+ System.out.println(bytes.length);
+ EndPoint ep2 = EndPoint.fromBytes(bytes);
+ System.out.println(ep2);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ConnectionStatistics
+{
+ private String localHost;
+ private int localPort;
+ private String remoteHost;
+ private int remotePort;
+ private int totalConnections;
+ private int connectionsInUse;
+
+ ConnectionStatistics(EndPoint localEp, EndPoint remoteEp, int tc, int ciu)
+ {
+ localHost = localEp.getHost();
+ localPort = localEp.getPort();
+ remoteHost = remoteEp.getHost();
+ remotePort = remoteEp.getPort();
+ totalConnections = tc;
+ connectionsInUse = ciu;
+ }
+
+ public String getLocalHost()
+ {
+ return localHost;
+ }
+
+ public int getLocalPort()
+ {
+ return localPort;
+ }
+
+ public String getRemoteHost()
+ {
+ return remoteHost;
+ }
+
+ public int getRemotePort()
+ {
+ return remotePort;
+ }
+
+ public int getTotalConnections()
+ {
+ return totalConnections;
+ }
+
+ public int getConnectionInUse()
+ {
+ return connectionsInUse;
+ }
+
+ public String toString()
+ {
+ return localHost + ":" + localPort + "->" + remoteHost + ":" + remotePort + " Total Connections open : " + totalConnections + " Connections in use : " + connectionsInUse;
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1 @@
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.io.IOException;
import java.io.Serializable;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.u
til.HashMap;
import java.util.Map;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
public class EndPoint implements Serializable, Comparable<EndPoint>
{
// logging and profiling.
private static Logger logger_ = Logger.getLogger(EndPoint.class);
private static final long serialVersionUID = -4962625949179835907L;
private static Map<CharBuffer, String> hostNames_ = new HashMap<CharBuffer, String>();
protected static final int randomPort_ = 5555;
public static EndPoint randomLocalEndPoint_;
static
{
try
{
randomLocalEndPoint_ = new EndPoint(FBUtilities.getHostName(), EndPoint.randomPort_);
}
catch ( IOException ex )
{
logger_.warn(LogUtil.throwableToString(ex));
}
}
private String host_;
pri
vate int port_;
private transient InetSocketAddress ia_;
/* Ctor for JAXB. DO NOT DELETE */
private EndPoint()
{
}
public EndPoint(String host, int port)
{
/*
* Attempts to resolve the host, but does not fail if it cannot.
*/
host_ = host;
port_ = port;
}
// create a local endpoint id
public EndPoint(int port)
{
try
{
host_ = FBUtilities.getLocalHostName();
port_ = port;
}
catch (UnknownHostException e)
{
logger_.warn(LogUtil.throwableToString(e));
}
}
public String getHost()
{
return host_;
}
public int getPort()
{
return port_;
}
public void setPort(int port)
{
port_ = port;
}
public InetSocketAddress getInetAddress()
{
if (ia_ == null || ia_.isUnresolved())
{
ia_ = new InetSocketAddress(host_, port_);
}
return ia_;
}
public boolean equals(Object o)
{
if (!(o instanceof EndPoint))
return false;
EndPoint rhs = (EndPoint) o;
return (host_.equals(rhs.host_) && port_ == rhs.port_);
}
public int hashCode()
{
return (host_ + port_).hashCode();
}
public int compareTo(EndPoint rhs)
{
return host_.compareTo(rhs.host_);
}
public String toString()
{
return (host_ + ":" + port_);
}
public static EndPoint fromString(String str)
{
String[] values = str.split(":");
return new EndPoint(values[0], Integer.parseInt(values[1]));
}
public static byte[] toBytes(EndPoint ep)
{
ByteBuffer buffer = ByteBuffer.allocate(6);
byte[] iaBytes = ep.getInetAddress().getAddress().getAddress();
buffer.put(iaBytes);
buffer.put(MessagingService.toByteArray((short) ep.getPort()));
buffer.flip();
return buffer.array();
}
public static EndPoint fromBytes(byte[] bytes)
{
ByteBuffer buffer = ByteBuffer.allocate(4);
System.arraycopy(bytes, 0, buffer.array(), 0, 4);
byte[] portBytes = new byte[2];
System.arraycopy(bytes, 4, portBytes, 0, portBytes.length);
try
{
CharBuffer charBuffer = buffer.asCharBuffer();
String host = hostNa
mes_.get(charBuffer);
if (host == null)
{
host = InetAddress.getByAddress(buffer.array()).getHostName();
hostNames_.put(charBuffer, host);
}
int port = (int) MessagingService.byteArrayToShort(portBytes);
return new EndPoint(host, port);
}
catch (UnknownHostException e)
{
throw new IllegalArgumentException(e);
}
}
}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.SocketException;
+
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class FileStreamTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger( FileStreamTask.class );
+
+ private String file_;
+ private long startPosition_;
+ private long total_;
+ private EndPoint from_;
+ private EndPoint to_;
+
+ FileStreamTask(String file, long startPosition, long total, EndPoint from, EndPoint to)
+ {
+ file_ = file;
+ startPosition_ = startPosition;
+ total_ = total;
+ from_ = from;
+ to_ = to;
+ }
+
+ public void run()
+ {
+ TcpConnection connection = null;
+ try
+ {
+ connection = new TcpConnection(from_, to_);
+ File file = new File(file_);
+ connection.stream(file, startPosition_, total_);
+ MessagingService.setStreamingMode(false);
+ logger_.debug("Done streaming " + file);
+ }
+ catch ( SocketException se )
+ {
+ logger_.info(LogUtil.throwableToString(se));
+ }
+ catch ( IOException e )
+ {
+ logConnectAndIOException(e, connection);
+ }
+ catch (Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ private void logConnectAndIOException(IOException ex, TcpConnection connection)
+ {
+ if ( connection != null )
+ {
+ connection.errorClose();
+ }
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/Header.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/Header.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/Header.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.GuidGenerator;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Header implements java.io.Serializable
+{
+ static final long serialVersionUID = -3194851946523170022L;
+ private static ICompactSerializer<Header> serializer_;
+ private static AtomicInteger idGen_ = new AtomicInteger(0);
+
+ static
+ {
+ serializer_ = new HeaderSerializer();
+ }
+
+ static ICompactSerializer<Header> serializer()
+ {
+ return serializer_;
+ }
+
+ private EndPoint from_;
+ private String type_;
+ private String verb_;
+ private String messageId_;
+ protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
+
+ Header(String id, EndPoint from, String messageType, String verb)
+ {
+ messageId_ = id;
+ from_ = from;
+ type_ = messageType;
+ verb_ = verb;
+ }
+
+ Header(String id, EndPoint from, String messageType, String verb, Map<String, byte[]> details)
+ {
+ this(id, from, messageType, verb);
+ details_ = details;
+ }
+
+ Header(EndPoint from, String messageType, String verb)
+ {
+ messageId_ = Integer.toString(idGen_.incrementAndGet());
+ from_ = from;
+ type_ = messageType;
+ verb_ = verb;
+ }
+
+ EndPoint getFrom()
+ {
+ return from_;
+ }
+
+ String getMessageType()
+ {
+ return type_;
+ }
+
+ String getVerb()
+ {
+ return verb_;
+ }
+
+ String getMessageId()
+ {
+ return messageId_;
+ }
+
+ void setMessageId(String id)
+ {
+ messageId_ = id;
+ }
+
+ void setMessageType(String type)
+ {
+ type_ = type;
+ }
+
+ void setMessageVerb(String verb)
+ {
+ verb_ = verb;
+ }
+
+ byte[] getDetail(Object key)
+ {
+ return details_.get(key);
+ }
+
+ void removeDetail(Object key)
+ {
+ details_.remove(key);
+ }
+
+ void addDetail(String key, byte[] value)
+ {
+ details_.put(key, value);
+ }
+
+ Map<String, byte[]> getDetails()
+ {
+ return details_;
+ }
+}
+
+class HeaderSerializer implements ICompactSerializer<Header>
+{
+ public void serialize(Header t, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(t.getMessageId());
+ CompactEndPointSerializationHelper.serialize(t.getFrom(), dos);
+ dos.writeUTF(t.getMessageType());
+ dos.writeUTF( t.getVerb() );
+
+ /* Serialize the message header */
+ int size = t.details_.size();
+ dos.writeInt(size);
+ Set<String> keys = t.details_.keySet();
+
+ for( String key : keys )
+ {
+ dos.writeUTF(key);
+ byte[] value = t.details_.get(key);
+ dos.writeInt(value.length);
+ dos.write(value);
+ }
+ }
+
+ public Header deserialize(DataInputStream dis) throws IOException
+ {
+ String id = dis.readUTF();
+ EndPoint from = CompactEndPointSerializationHelper.deserialize(dis);
+ String type = dis.readUTF();
+ String verb = dis.readUTF();
+
+ /* Deserializing the message header */
+ int size = dis.readInt();
+ Map<String, byte[]> details = new Hashtable<String, byte[]>(size);
+ for ( int i = 0; i < size; ++i )
+ {
+ String key = dis.readUTF();
+ int length = dis.readInt();
+ byte[] bytes = new byte[length];
+ dis.readFully(bytes);
+ details.put(key, bytes);
+ }
+
+ return new Header(id, from, type, verb, details);
+ }
+}
+
+
Added: incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class HeaderTypes
+{
+ public final static String TASK_PROFILE_CHAIN = "TASK_PROFILE_CHAIN";
+ public static String TASK_ID = "TASK_ID";
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncCallback
+{
+ /**
+ * @param response responses to be returned
+ */
+ public void response(Message msg);
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncResult
+{
+ public Object[] get();
+ public boolean isDone();
+ public Object[] get(long timeout, TimeUnit tu) throws TimeoutException;
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.cassandra.concurrent.IStage;
+
+
+/**
+ * An IMessagingService provides the methods for sending messages to remote
+ * endpoints. IMessagingService enables the sending of request-response style
+ * messages and fire-forget style messages.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessagingService
+{
+ /**
+ * Register a verb and the corresponding verb handler with the
+ * Messaging Service.
+ * @param type name of the verb.
+ * @param verbHandler handler for the specified verb
+ */
+ public void registerVerbHandlers(String type, IVerbHandler verbHandler);
+
+ /**
+ * Deregister all verbhandlers corresponding to localEndPoint.
+ * @param localEndPoint
+ */
+ public void deregisterAllVerbHandlers(EndPoint localEndPoint);
+
+ /**
+ * Deregister a verbhandler corresponding to the verb from the
+ * Messaging Service.
+ * @param type name of the verb.
+ */
+ public void deregisterVerbHandlers(String type);
+
+ /**
+ * Listen on the specified port.
+ * @param ep EndPoint whose port to listen on.
+ * @param isHttp specify if the port is an Http port.
+ */
+ public void listen(EndPoint ep, boolean isHttp) throws IOException;
+
+ /**
+ * Listen on the specified port.
+ * @param ep EndPoint whose port to listen on.
+ */
+ public void listenUDP(EndPoint ep);
+
+ /**
+ * Send a message to a given endpoint.
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @return an reference to an IAsyncResult which can be queried for the
+ * response
+ */
+ public IAsyncResult sendRR(Message message, EndPoint to);
+
+ /**
+ * Send a message to the given set of endpoints and informs the MessagingService
+ * to wait for at least <code>howManyResults</code> responses to determine success
+ * of failure.
+ * @param message message to be sent.
+ * @param to endpoints to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses
+ * @return an reference to message id used to match with the result
+ */
+ public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb);
+
+ /**
+ * Send a message to a given endpoint. This method specifies a callback
+ * which is invoked with the actual response.
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occured to the invoker of the send().
+ * suggest that a timeout occured to the invoker of the send().
+ * @return an reference to message id used to match with the result
+ */
+ public String sendRR(Message message, EndPoint to, IAsyncCallback cb);
+
+ /**
+ * Send a message to a given endpoint. The ith element in the <code>messages</code>
+ * array is sent to the ith element in the <code>to</code> array.This method assumes
+ * there is a one-one mapping between the <code>messages</code> array and
+ * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
+ * This method also informs the MessagingService to wait for at least
+ * <code>howManyResults</code> responses to determine success of failure.
+ * @param messages messages to be sent.
+ * @param to endpoints to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occured to the invoker of the send().
+ * suggest that a timeout occured to the invoker of the send().
+ * @return an reference to message id used to match with the result
+ */
+ public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+
+ /**
+ * Send a message to a given endpoint. This method adheres to the fire and forget
+ * style messaging.
+ * @param message messages to be sent.
+ * @param to endpoint to which the message needs to be sent
+ */
+ public void sendOneWay(Message message, EndPoint to);
+
+ /**
+ * Send a message to a given endpoint. This method adheres to the fire and forget
+ * style messaging.
+ * @param message messages to be sent.
+ * @param to endpoint to which the message needs to be sent
+ */
+ public void sendUdpOneWay(Message message, EndPoint to);
+
+ /**
+ * Stream a file from source to destination. This is highly optimized
+ * to not hold any of the contents of the file in memory.
+ * @param file name of file to stream.
+ * param start position inside the file
+ * param total number of bytes to stream
+ * param to endpoint to which we need to stream the file.
+ */
+ public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to);
+
+ /**
+ * This method returns the verb handler associated with the registered
+ * verb. If no handler has been registered then null is returned.
+ * @param verb for which the verb handler is sought
+ * @return a reference to IVerbHandler which is the handler for the specified verb
+ */
+ public IVerbHandler getVerbHandler(String verb);
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * IVerbHandler provides the method that all verb handlers need to implement.
+ * The concrete implementation of this interface would provide the functionality
+ * for a given verb.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IVerbHandler
+{
+ /**
+ * This method delivers a message to the implementing class (if the implementing
+ * class was registered by a call to MessagingService.registerVerbHandlers).
+ * Note that the caller should not be holding any locks when calling this method
+ * because the implementation may be synchronized.
+ *
+ * @param message - incoming message that needs handling.
+ */
+ public void doVerb(Message message);
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/Message.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/Message.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/Message.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.lang.reflect.Array;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Message implements java.io.Serializable
+{
+ static final long serialVersionUID = 6329198792470413221L;
+ private static ICompactSerializer<Message> serializer_;
+
+ static
+ {
+ serializer_ = new MessageSerializer();
+ }
+
+ public static ICompactSerializer<Message> serializer()
+ {
+ return serializer_;
+ }
+
+ Header header_;
+ private Object[] body_ = new Object[0];
+
+ /* Ctor for JAXB. DO NOT DELETE */
+ private Message()
+ {
+ }
+
+ protected Message(String id, EndPoint from, String messageType, String verb, Object[] body)
+ {
+ header_ = new Header(id, from, messageType, verb);
+ body_ = body;
+ }
+
+ protected Message(Header header, Object[] body)
+ {
+ header_ = header;
+ body_ = body;
+ }
+
+ public Message(EndPoint from, String messageType, String verb, Object[] body)
+ {
+ header_ = new Header(from, messageType, verb);
+ body_ = body;
+ }
+
+ public byte[] getHeader(Object key)
+ {
+ return header_.getDetail(key);
+ }
+
+ public void removeHeader(Object key)
+ {
+ header_.removeDetail(key);
+ }
+
+ public void setMessageType(String type)
+ {
+ header_.setMessageType(type);
+ }
+
+ public void setMessageVerb(String verb)
+ {
+ header_.setMessageVerb(verb);
+ }
+
+ public void addHeader(String key, byte[] value)
+ {
+ header_.addDetail(key, value);
+ }
+
+ public Map<String, byte[]> getHeaders()
+ {
+ return header_.getDetails();
+ }
+
+ public Object[] getMessageBody()
+ {
+ return body_;
+ }
+
+ public void setMessageBody(Object[] body)
+ {
+ body_ = body;
+ }
+
+ public EndPoint getFrom()
+ {
+ return header_.getFrom();
+ }
+
+ public String getMessageType()
+ {
+ return header_.getMessageType();
+ }
+
+ public String getVerb()
+ {
+ return header_.getVerb();
+ }
+
+ public String getMessageId()
+ {
+ return header_.getMessageId();
+ }
+
+ public Class[] getTypes()
+ {
+ List<Class> types = new ArrayList<Class>();
+
+ for ( int i = 0; i < body_.length; ++i )
+ {
+ if ( body_[i].getClass().isArray() )
+ {
+ int size = Array.getLength(body_[i]);
+ if ( size > 0 )
+ {
+ types.add( Array.get( body_[i], 0).getClass() );
+ }
+ }
+ else
+ {
+ types.add(body_[i].getClass());
+ }
+ }
+
+ return types.toArray( new Class[0] );
+ }
+
+ void setMessageId(String id)
+ {
+ header_.setMessageId(id);
+ }
+
+ public Message getReply(EndPoint from, Object[] args)
+ {
+ Message response = new Message(getMessageId(),
+ from,
+ MessagingService.responseStage_,
+ MessagingService.responseVerbHandler_,
+ args);
+ return response;
+ }
+
+ public String toString()
+ {
+ StringBuffer sbuf = new StringBuffer("");
+ String separator = System.getProperty("line.separator");
+ sbuf.append("ID:" + getMessageId());
+ sbuf.append(separator);
+ sbuf.append("FROM:" + getFrom());
+ sbuf.append(separator);
+ sbuf.append("TYPE:" + getMessageType());
+ sbuf.append(separator);
+ sbuf.append("VERB:" + getVerb());
+ sbuf.append(separator);
+ sbuf.append("BODY TYPE:" + getBodyTypes());
+ sbuf.append(separator);
+ return sbuf.toString();
+ }
+
+ private String getBodyTypes()
+ {
+ StringBuffer sbuf = new StringBuffer("");
+ Class[] types = getTypes();
+ for ( int i = 0; i < types.length; ++i )
+ {
+ sbuf.append(types[i].getName());
+ sbuf.append(" ");
+ }
+ return sbuf.toString();
+ }
+}
+
+class MessageSerializer implements ICompactSerializer<Message>
+{
+ public void serialize(Message t, DataOutputStream dos) throws IOException
+ {
+ Header.serializer().serialize( t.header_, dos);
+ byte[] bytes = (byte[])t.getMessageBody()[0];
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ }
+
+ public Message deserialize(DataInputStream dis) throws IOException
+ {
+ Header header = Header.serializer().deserialize(dis);
+ int size = dis.readInt();
+ byte[] bytes = new byte[size];
+ dis.readFully(bytes);
+ // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
+ return new Message(header, new Object[]{bytes});
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessageDeliveryTask implements Runnable
+{
+ private Message message_;
+ private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);
+
+ public MessageDeliveryTask(Message message)
+ {
+ message_ = message;
+ }
+
+ public void run()
+ {
+ try
+ {
+ String verb = message_.getVerb();
+ IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);
+ if ( verbHandler != null )
+ {
+ verbHandler.doVerb(message_);
+ }
+ }
+ catch (Throwable th)
+ {
+ logger_.warn( LogUtil.throwableToString(th) );
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageDeserializationTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
+ private static ISerializer serializer_ = new FastSerializer();
+ private int serializerType_;
+ private byte[] bytes_ = new byte[0];
+
+ MessageDeserializationTask(int serializerType, byte[] bytes)
+ {
+ serializerType_ = serializerType;
+ bytes_ = bytes;
+ }
+
+ public void run()
+ {
+ /* For DEBUG only. Printing queue length */
+ DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getDeserilizationExecutor();
+ logger_.debug( "Message Deserialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+ /* END DEBUG */
+ try
+ {
+ Message message = (Message)serializer_.deserialize(bytes_);
+
+ if ( message != null )
+ {
+ message = SinkManager.processServerMessageSink(message);
+ MessagingService.receive(message);
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.net.SocketException;
+
+import org.apache.cassandra.concurrent.Context;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadLocalContext;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageSerializationTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(MessageSerializationTask.class);
+ private Message message_;
+ private EndPoint to_;
+
+ public MessageSerializationTask(Message message, EndPoint to)
+ {
+ message_ = message;
+ to_ = to;
+ }
+
+ public Message getMessage()
+ {
+ return message_;
+ }
+
+ public void run()
+ {
+ /* For DEBUG only. Printing queue length */
+ DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getWriteExecutor();
+ logger_.debug( "Message Serialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+ /* END DEBUG */
+
+ /* Adding the message to be serialized in the TLS. For accessing in the afterExecute() */
+ Context ctx = new Context();
+ ctx.put(this.getClass().getName(), message_);
+ ThreadLocalContext.put(ctx);
+
+ TcpConnection connection = null;
+ try
+ {
+ Message message = SinkManager.processClientMessageSink(message_);
+ if(null == message)
+ return;
+ connection = MessagingService.getConnection(message_.getFrom(), to_);
+ connection.write(message);
+ }
+ catch ( SocketException se )
+ {
+ // Shutting down the entire pool. May be too conservative an approach.
+ MessagingService.getConnectionPool(message_.getFrom(), to_).shutdown();
+ logger_.warn(LogUtil.throwableToString(se));
+ }
+ catch ( IOException e )
+ {
+ logConnectAndIOException(e, connection);
+ }
+ catch (Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ finally
+ {
+ if ( connection != null )
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void logConnectAndIOException(IOException ex, TcpConnection connection)
+ {
+ if ( connection != null )
+ {
+ connection.errorClose();
+ }
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingConfig
+{
+ // The expected time for one message round trip. It does not reflect message processing
+ // time at the receiver.
+ private static int expectedRoundTripTime_ = 400;
+ private static int numberOfPorts_ = 2;
+ private static int threadCount_ = 4;
+
+ public static int getMessagingThreadCount()
+ {
+ return threadCount_;
+ }
+
+ public static void setMessagingThreadCount(int threadCount)
+ {
+ threadCount_ = threadCount;
+ }
+
+ public static void setExpectedRoundTripTime(int roundTripTimeMillis) {
+ if(roundTripTimeMillis > 0 )
+ expectedRoundTripTime_ = roundTripTimeMillis;
+ }
+
+ public static int getExpectedRoundTripTime()
+ {
+ return expectedRoundTripTime_;
+ }
+
+ public static int getConnectionPoolInitialSize()
+ {
+ return ConnectionPoolConfiguration.initialSize_;
+ }
+
+ public static int getConnectionPoolGrowthFactor()
+ {
+ return ConnectionPoolConfiguration.growthFactor_;
+ }
+
+ public static int getConnectionPoolMaxSize()
+ {
+ return ConnectionPoolConfiguration.maxSize_;
+ }
+
+ public static int getConnectionPoolWaitTimeout()
+ {
+ return ConnectionPoolConfiguration.waitTimeout_;
+ }
+
+ public static int getConnectionPoolMonitorInterval()
+ {
+ return ConnectionPoolConfiguration.monitorInterval_;
+ }
+
+ public static void setNumberOfPorts(int n)
+ {
+ numberOfPorts_ = n;
+ }
+
+ public static int getNumberOfPorts()
+ {
+ return numberOfPorts_;
+ }
+}
+
+class ConnectionPoolConfiguration
+{
+ public static int initialSize_ = 1;
+ public static int growthFactor_ = 1;
+ public static int maxSize_ = 1;
+ public static int waitTimeout_ = 10;
+ public static int monitorInterval_ = 300;
+}