You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/21 20:26:04 UTC
svn commit: r828130 [3/3] - in /incubator/cassandra/trunk:
contrib/bmt_example/
contrib/property_snitch/src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Wed Oct 21 18:26:02 2009
@@ -24,10 +24,13 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import java.net.InetAddress;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.log4j.Logger;
public class StreamContextManager
@@ -42,7 +45,6 @@
public static class StreamContext implements Serializable
{
- private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
private static ICompactSerializer<StreamContext> serializer_;
static
@@ -218,7 +220,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
- return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
}
protected StreamContextManager.StreamStatus streamStatus_;
@@ -249,13 +251,13 @@
}
/* Maintain a stream context per host that is the source of the stream */
- public static final Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();
+ public static final Map<InetAddress, List<StreamContext>> ctxBag_ = new Hashtable<InetAddress, List<StreamContext>>();
/* Maintain in this map the status of the streams that need to be sent back to the source */
- public static final Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
+ public static final Map<InetAddress, List<StreamStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<StreamStatus>>();
/* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
- public static final Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
+ public static final Map<InetAddress, IStreamComplete> streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
- public synchronized static StreamContext getStreamContext(String key)
+ public synchronized static StreamContext getStreamContext(InetAddress key)
{
List<StreamContext> context = ctxBag_.get(key);
if ( context == null )
@@ -266,7 +268,7 @@
return streamContext;
}
- public synchronized static StreamStatus getStreamStatus(String key)
+ public synchronized static StreamStatus getStreamStatus(InetAddress key)
{
List<StreamStatus> status = streamStatusBag_.get(key);
if ( status == null )
@@ -281,27 +283,27 @@
* This method helps determine if the StreamCompletionHandler needs
* to be invoked for the data being streamed from a source.
*/
- public synchronized static boolean isDone(String key)
+ public synchronized static boolean isDone(InetAddress key)
{
return (ctxBag_.get(key) == null);
}
- public synchronized static IStreamComplete getStreamCompletionHandler(String key)
+ public synchronized static IStreamComplete getStreamCompletionHandler(InetAddress key)
{
return streamNotificationHandlers_.get(key);
}
- public synchronized static void removeStreamCompletionHandler(String key)
+ public synchronized static void removeStreamCompletionHandler(InetAddress key)
{
streamNotificationHandlers_.remove(key);
}
- public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
+ public synchronized static void registerStreamCompletionHandler(InetAddress key, IStreamComplete streamComplete)
{
streamNotificationHandlers_.put(key, streamComplete);
}
- public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
+ public synchronized static void addStreamContext(InetAddress key, StreamContext streamContext, StreamStatus streamStatus)
{
/* Record the stream context */
List<StreamContext> context = ctxBag_.get(key);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
package org.apache.cassandra.net.sink;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.Message;
public interface IMessageSink
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java Wed Oct 21 18:26:02 2009
@@ -21,7 +21,7 @@
import java.util.*;
import java.io.IOException;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.Message;
public class SinkManager
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java Wed Oct 21 18:26:02 2009
@@ -21,7 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Set;
+import java.net.InetAddress;
import org.apache.log4j.Logger;
@@ -55,14 +55,14 @@
private void setup() throws IOException, TTransportException
{
int listenPort = DatabaseDescriptor.getThriftPort();
- String listenAddr = DatabaseDescriptor.getThriftAddress();
+ InetAddress listenAddr = DatabaseDescriptor.getThriftAddress();
/*
* If ThriftAddress was left completely unconfigured, then assume
- * the same default as ListenAddress, (InetAddress.getLocalHost).
+ * the same default as ListenAddress
*/
if (listenAddr == null)
- listenAddr = FBUtilities.getHostAddress();
+ listenAddr = FBUtilities.getLocalAddress();
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Wed Oct 21 18:26:02 2009
@@ -33,7 +33,7 @@
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.dht.Token;
@@ -509,9 +509,9 @@
else if (propertyName.equals(TOKEN_MAP))
{
HashMap<String, String> tokenToHostMap = new HashMap<String,String>();
- Map<Token, EndPoint> endpointMap = storageService.getLiveEndPointMap();
- for (Map.Entry<Token, EndPoint> e : endpointMap.entrySet())
- tokenToHostMap.put(e.getKey().toString(), e.getValue().getHost());
+ Map<Token, InetAddress> endpointMap = storageService.getLiveEndPointMap();
+ for (Map.Entry<Token, InetAddress> e : endpointMap.entrySet())
+ tokenToHostMap.put(e.getKey().toString(), e.getValue().getHostAddress());
return new JSONSerializer().serialize(tokenToHostMap);
}
else if (propertyName.equals("version"))
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Oct 21 18:26:02 2009
@@ -27,14 +27,12 @@
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.ICacheExpungeHook;
-import org.apache.cassandra.utils.ICachetable;
-import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.*;
+
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
@@ -82,13 +80,13 @@
{
IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
/* Add the local storage endpoint to the replicas_ list */
- replicas_.add(StorageService.getLocalStorageEndPoint());
+ replicas_.add(FBUtilities.getLocalAddress());
IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);
ReadCommand readCommand = constructReadMessage(false);
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
+ MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler);
}
}
@@ -133,10 +131,10 @@
private static long scheduledTimeMillis_ = 600;
private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
private final Row row_;
- protected final List<EndPoint> replicas_;
+ protected final List<InetAddress> replicas_;
private final ReadCommand readCommand_;
- public ConsistencyManager(Row row, List<EndPoint> replicas, ReadCommand readCommand)
+ public ConsistencyManager(Row row, List<InetAddress> replicas, ReadCommand readCommand)
{
row_ = row;
replicas_ = replicas;
@@ -151,7 +149,7 @@
Message message = readCommandDigestOnly.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
+ MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler());
}
catch (IOException ex)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Wed Oct 21 18:26:02 2009
@@ -22,7 +22,9 @@
import java.util.concurrent.locks.*;
import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Cachetable;
@@ -53,20 +55,26 @@
* This is the internal class which actually
* implements the global hook function called by the read repair manager
*/
- static class ReadRepairPerformer implements
- ICacheExpungeHook<String, Message>
+ static class ReadRepairPerformer implements ICacheExpungeHook<String, Message>
{
/*
* The hook function which takes the end point and the row mutation that
* needs to be sent to the end point in order
* to perform read repair.
*/
- public void callMe(String target,
- Message message)
+ public void callMe(String target, Message message)
{
String[] pieces = FBUtilities.strip(target, ":");
- EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
- MessagingService.instance().sendOneWay(message, to);
+ InetAddress to = null;
+ try
+ {
+ to = InetAddress.getByName(pieces[0]);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ MessagingService.instance().sendOneWay(message, to);
}
}
@@ -101,7 +109,7 @@
* @param target endpoint on which the read repair should happen
* @param rowMutationMessage the row mutation message that has the repaired row.
*/
- public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
+ public void schedule(InetAddress target, RowMutationMessage rowMutationMessage)
{
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Oct 21 18:26:02 2009
@@ -31,7 +31,7 @@
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowMutationMessage;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -61,7 +61,7 @@
long startTime = System.currentTimeMillis();
Row retRow = null;
List<Row> rowList = new ArrayList<Row>();
- List<EndPoint> endPoints = new ArrayList<EndPoint>();
+ List<InetAddress> endPoints = new ArrayList<InetAddress>();
String key = null;
String table = null;
byte[] digest = new byte[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Oct 21 18:26:02 2009
@@ -34,10 +34,11 @@
import org.apache.cassandra.gms.EndPointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
/*
* The load balancing algorithm here is an implementation of
@@ -71,14 +72,14 @@
/*
int threshold = (int)(StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad());
int myLoad = localLoad();
- EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
+ InetAddress predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
if (logger_.isDebugEnabled())
logger_.debug("Trying to relocate the predecessor " + predecessor);
boolean value = tryThisNode(myLoad, threshold, predecessor);
if ( !value )
{
loadInfo2_.remove(predecessor);
- EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
+ InetAddress successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
if (logger_.isDebugEnabled())
logger_.debug("Trying to relocate the successor " + successor);
value = tryThisNode(myLoad, threshold, successor);
@@ -87,7 +88,7 @@
loadInfo2_.remove(successor);
while ( !loadInfo2_.isEmpty() )
{
- EndPoint target = findARandomLightNode();
+ InetAddress target = findARandomLightNode();
if ( target != null )
{
if (logger_.isDebugEnabled())
@@ -123,7 +124,7 @@
}
/*
- private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
+ private boolean tryThisNode(int myLoad, int threshold, InetAddress target)
{
boolean value = false;
LoadInfo li = loadInfo2_.get(target);
@@ -155,7 +156,7 @@
{
public void doVerb(Message message)
{
- Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
+ Message reply = message.getReply(FBUtilities.getLocalAddress(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
MessagingService.instance().sendOneWay(reply, message.getFrom());
if ( isMoveable_.get() )
{
@@ -186,9 +187,9 @@
/* this indicates whether this node is already helping someone else */
private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
- private Map<EndPoint, Double> loadInfo_ = new HashMap<EndPoint, Double>();
+ private Map<InetAddress, Double> loadInfo_ = new HashMap<InetAddress, Double>();
/* This map is a clone of the one above and is used for various calculations during LB operation */
- private Map<EndPoint, Double> loadInfo2_ = new HashMap<EndPoint, Double>();
+ private Map<InetAddress, Double> loadInfo2_ = new HashMap<InetAddress, Double>();
/* This thread pool is used for initiating load balancing operations */
private ExecutorService lb_ = new DebuggableThreadPoolExecutor("LB-OPERATIONS");
/* This thread pool is used by target node to leave the ring. */
@@ -204,7 +205,7 @@
Gossiper.instance().register(this);
}
- public void onChange(EndPoint endpoint, EndPointState epState)
+ public void onChange(InetAddress endpoint, EndPointState epState)
{
// load information for this specified endpoint for load balancing
ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
@@ -232,7 +233,7 @@
if ( !isMoveable_.get() )
return false;
int myload = localLoad();
- EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
+ InetAddress successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
LoadInfo li = loadInfo2_.get(successor);
// "load" is NULL means that the successor node has not
// yet gossiped its load information. We should return
@@ -249,17 +250,17 @@
private double localLoad()
{
- Double load = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
+ Double load = loadInfo2_.get(FBUtilities.getLocalAddress());
return load == null ? 0 : load;
}
private double averageSystemLoad()
{
int nodeCount = loadInfo2_.size();
- Set<EndPoint> nodes = loadInfo2_.keySet();
+ Set<InetAddress> nodes = loadInfo2_.keySet();
double systemLoad = 0;
- for (EndPoint node : nodes)
+ for (InetAddress node : nodes)
{
systemLoad += loadInfo2_.get(node);
}
@@ -274,7 +275,7 @@
return ( localLoad() > ( StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad() ) );
}
- private boolean isMoveable(EndPoint target)
+ private boolean isMoveable(InetAddress target)
{
double threshold = StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad();
if (isANeighbour(target))
@@ -295,20 +296,20 @@
}
else
{
- EndPoint successor = StorageService.instance().getSuccessor(target);
+ InetAddress successor = StorageService.instance().getSuccessor(target);
double sLoad = loadInfo2_.get(successor);
double targetLoad = loadInfo2_.get(target);
return (sLoad + targetLoad) <= threshold;
}
}
- private boolean isANeighbour(EndPoint neighbour)
+ private boolean isANeighbour(InetAddress neighbour)
{
- EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
+ InetAddress predecessor = StorageService.instance().getPredecessor(FBUtilities.getLocalAddress());
if ( predecessor.equals(neighbour) )
return true;
- EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
+ InetAddress successor = StorageService.instance().getSuccessor(FBUtilities.getLocalAddress());
if ( successor.equals(neighbour) )
return true;
@@ -320,13 +321,13 @@
* random one of the lightly loaded nodes and use them as
* a potential target for load balance.
*/
- private EndPoint findARandomLightNode()
+ private InetAddress findARandomLightNode()
{
- List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
- Set<EndPoint> allTargets = loadInfo2_.keySet();
+ List<InetAddress> potentialCandidates = new ArrayList<InetAddress>();
+ Set<InetAddress> allTargets = loadInfo2_.keySet();
double avgLoad = averageSystemLoad();
- for (EndPoint target : allTargets)
+ for (InetAddress target : allTargets)
{
double load = loadInfo2_.get(target);
if (load < avgLoad)
@@ -344,7 +345,7 @@
return null;
}
- public Map<EndPoint, Double> getLoadInfo()
+ public Map<InetAddress, Double> getLoadInfo()
{
return loadInfo_;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Oct 21 18:26:02 2009
@@ -28,11 +28,12 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.dht.IPartitioner;
@@ -69,15 +70,15 @@
* sent over the wire to N replicas where some of the replicas
* may be hints.
*/
- private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, Map<EndPoint, EndPoint> endpointMap) throws IOException
+ private static Map<InetAddress, Message> createWriteMessages(RowMutation rm, Map<InetAddress, InetAddress> endpointMap) throws IOException
{
- Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+ Map<InetAddress, Message> messageMap = new HashMap<InetAddress, Message>();
Message message = rm.makeRowMutationMessage();
- for (Map.Entry<EndPoint, EndPoint> entry : endpointMap.entrySet())
+ for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
{
- EndPoint target = entry.getKey();
- EndPoint hint = entry.getValue();
+ InetAddress target = entry.getKey();
+ InetAddress hint = entry.getValue();
if ( !target.equals(hint) )
{
Message hintedMessage = rm.makeRowMutationMessage();
@@ -113,13 +114,14 @@
long startTime = System.currentTimeMillis();
try
{
- EndPoint[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
- Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
- Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
- for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
+ InetAddress[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
+ // (This is the ZERO consistency level, so user doesn't care if we don't really have N destinations available.)
+ Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
+ Map<InetAddress, Message> messageMap = createWriteMessages(rm, endpointMap);
+ for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
{
Message message = entry.getValue();
- EndPoint endpoint = entry.getKey();
+ InetAddress endpoint = entry.getKey();
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint);
MessagingService.instance().sendOneWay(message, endpoint);
@@ -149,10 +151,10 @@
}
try
{
- EndPoint[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
- Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
+ InetAddress[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
+ Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
int blockFor = determineBlockFor(naturalEndpoints.length, endpointMap.size(), consistency_level);
- List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
+ List<InetAddress> primaryNodes = getUnhintedNodes(endpointMap);
if (primaryNodes.size() < blockFor) // guarantee blockFor = W live nodes.
{
throw new UnavailableException();
@@ -162,12 +164,12 @@
logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ") + "]");
// Get all the targets and stick them in an array
- MessagingService.instance().sendRR(message, primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
+ MessagingService.instance().sendRR(message, primaryNodes.toArray(new InetAddress[primaryNodes.size()]), quorumResponseHandler);
if (!quorumResponseHandler.get())
throw new UnavailableException();
if (primaryNodes.size() < endpointMap.size()) // Do we need to bother with Hinted Handoff?
{
- for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+ for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
{
if (e.getKey() != e.getValue()) // Hinted Handoff to target
{
@@ -187,10 +189,10 @@
}
}
- private static List<EndPoint> getUnhintedNodes(Map<EndPoint, EndPoint> endpointMap)
+ private static List<InetAddress> getUnhintedNodes(Map<InetAddress, InetAddress> endpointMap)
{
- List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(endpointMap.size());
- for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+ List<InetAddress> liveEndPoints = new ArrayList<InetAddress>(endpointMap.size());
+ for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
{
if (e.getKey() == e.getValue())
{
@@ -247,7 +249,7 @@
for (ReadCommand command: commands)
{
- EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
+ InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.key);
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
@@ -296,8 +298,8 @@
for (ReadCommand command: commands)
{
- EndPoint[] endpoints = StorageService.instance().getReadStorageEndPoints(command.key);
- boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
+ InetAddress[] endpoints = StorageService.instance().getReadStorageEndPoints(command.key);
+ boolean foundLocal = Arrays.asList(endpoints).contains(FBUtilities.getLocalAddress());
//TODO: Throw InvalidRequest if we're in bootstrap mode?
if (foundLocal && !StorageService.instance().isBootstrapMode())
{
@@ -340,7 +342,7 @@
private static List<Row> strongRead(List<ReadCommand> commands) throws IOException, TimeoutException, InvalidRequestException, UnavailableException
{
List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
- List<EndPoint[]> commandEndPoints = new ArrayList<EndPoint[]>();
+ List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
List<Row> rows = new ArrayList<Row>();
int commandIndex = 0;
@@ -356,11 +358,11 @@
Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver());
- EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
- List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
+ InetAddress dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
+ List<InetAddress> endpointList = new ArrayList<InetAddress>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
/* Remove the local storage endpoint from the list. */
endpointList.remove(dataPoint);
- EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
+ InetAddress[] endPoints = new InetAddress[endpointList.size() + 1];
Message messages[] = new Message[endpointList.size() + 1];
/*
@@ -374,7 +376,7 @@
logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
for (int i = 1; i < endPoints.length; i++)
{
- EndPoint digestPoint = endpointList.get(i - 1);
+ InetAddress digestPoint = endpointList.get(i - 1);
endPoints[i] = digestPoint;
messages[i] = messageDigestOnly;
if (logger.isDebugEnabled())
@@ -440,9 +442,9 @@
List<Row> rows = new ArrayList<Row>();
for (ReadCommand command: commands)
{
- List<EndPoint> endpoints = StorageService.instance().getLiveReadStorageEndPoints(command.key);
+ List<InetAddress> endpoints = StorageService.instance().getLiveReadStorageEndPoints(command.key);
/* Remove the local storage endpoint from the list. */
- endpoints.remove(StorageService.getLocalStorageEndPoint());
+ endpoints.remove(FBUtilities.getLocalAddress());
// TODO: throw a thrift exception if we do not have N nodes
if (logger.isDebugEnabled())
@@ -471,9 +473,9 @@
List<String> allKeys = new ArrayList<String>();
RangeCommand command = rawCommand;
- EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
- EndPoint startEndpoint = endPoint;
- EndPoint wrapEndpoint = tokenMetadata.getFirstEndpoint();
+ InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
+ InetAddress startEndpoint = endPoint;
+ InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
do
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Oct 21 18:26:02 2009
@@ -18,13 +18,13 @@
package org.apache.cassandra.service;
-import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.net.InetAddress;
import javax.management.*;
import org.apache.cassandra.concurrent.*;
@@ -34,8 +34,6 @@
import org.apache.cassandra.gms.*;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -70,40 +68,26 @@
public final static String bootStrapInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
public final static String bootStrapTerminateVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
- public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
- private static EndPoint tcpAddr_;
- private static EndPoint udpAddr_;
private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
-
private static volatile StorageService instance_;
- public static EndPoint getLocalStorageEndPoint()
- {
- return tcpAddr_;
- }
-
- public static EndPoint getLocalControlEndPoint()
- {
- return udpAddr_;
- }
-
public static IPartitioner<?> getPartitioner() {
return partitioner_;
}
public Set<Range> getLocalRanges()
{
- return getRangesForEndPoint(getLocalStorageEndPoint());
+ return getRangesForEndPoint(FBUtilities.getLocalAddress());
}
public Range getLocalPrimaryRange()
{
- return getPrimaryRangeForEndPoint(getLocalStorageEndPoint());
+ return getPrimaryRangeForEndPoint(FBUtilities.getLocalAddress());
}
/*
@@ -157,16 +141,16 @@
private AbstractReplicationStrategy replicationStrategy_;
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
- private Set<EndPoint> bootstrapSet;
+ private Set<InetAddress> bootstrapSet;
- public synchronized void addBootstrapSource(EndPoint s)
+ public synchronized void addBootstrapSource(InetAddress s)
{
if (logger_.isDebugEnabled())
logger_.debug("Added " + s + " as a bootstrap source");
bootstrapSet.add(s);
}
- public synchronized boolean removeBootstrapSource(EndPoint s)
+ public synchronized boolean removeBootstrapSource(InetAddress s)
{
bootstrapSet.remove(s);
@@ -176,7 +160,7 @@
{
SystemTable.setBootstrapped();
isBootstrapMode = false;
- updateTokenMetadata(storageMetadata_.getToken(), StorageService.tcpAddr_, false);
+ updateTokenMetadata(storageMetadata_.getToken(), FBUtilities.getLocalAddress(), false);
logger_.info("Bootstrap completed! Now serving reads.");
/* Tell others you're not bootstrapping anymore */
@@ -185,7 +169,7 @@
return isBootstrapMode;
}
- private void updateTokenMetadata(Token token, EndPoint endpoint, boolean isBootstraping)
+ private void updateTokenMetadata(Token token, InetAddress endpoint, boolean isBootstraping)
{
tokenMetadata_.update(token, endpoint, isBootstraping);
if (!isBootstraping)
@@ -213,7 +197,7 @@
throw new RuntimeException(e);
}
- bootstrapSet = new HashSet<EndPoint>();
+ bootstrapSet = new HashSet<InetAddress>();
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */
@@ -223,7 +207,6 @@
MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
MessagingService.instance().registerVerbHandlers(dataFileVerbHandler_, new DataFileVerbHandler() );
- MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
@@ -252,15 +235,13 @@
public void start() throws IOException
{
storageMetadata_ = SystemTable.initMetadata();
- tcpAddr_ = new EndPoint(FBUtilities.getHostAddress(), DatabaseDescriptor.getStoragePort());
- udpAddr_ = new EndPoint(FBUtilities.getHostAddress(), DatabaseDescriptor.getControlPort());
isBootstrapMode = DatabaseDescriptor.isAutoBootstrap()
- && !(DatabaseDescriptor.getSeeds().contains(udpAddr_.getHost()) || SystemTable.isBootstrapped());
+ && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped());
/* Listen for application messages */
- MessagingService.instance().listen(tcpAddr_);
+ MessagingService.instance().listen(FBUtilities.getLocalAddress());
/* Listen for control messages */
- MessagingService.instance().listenUDP(udpAddr_);
+ MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
SelectorManager.getSelectorManager().start();
SelectorManager.getUdpSelectorManager().start();
@@ -271,7 +252,7 @@
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a nodeId to our state, below.)
Gossiper.instance().register(this);
- Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
+ Gossiper.instance().start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
if (isBootstrapMode)
{
@@ -280,7 +261,7 @@
else
{
SystemTable.setBootstrapped();
- tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, isBootstrapMode);
+ tokenMetadata_.update(storageMetadata_.getToken(), FBUtilities.getLocalAddress(), isBootstrapMode);
}
// Gossip my token.
@@ -301,7 +282,7 @@
}
/* TODO: used for testing */
- public void updateTokenMetadataUnsafe(Token token, EndPoint endpoint)
+ public void updateTokenMetadataUnsafe(Token token, InetAddress endpoint)
{
tokenMetadata_.update(token, endpoint);
}
@@ -312,13 +293,13 @@
}
/*
- * Given an EndPoint this method will report if the
+ * Given an InetAddress this method will report if the
* endpoint is in the same data center as the local
* storage endpoint.
*/
- public boolean isInSameDataCenter(EndPoint endpoint) throws IOException
+ public boolean isInSameDataCenter(InetAddress endpoint) throws IOException
{
- return endPointSnitch_.isInSameDataCenter(StorageService.tcpAddr_, endpoint);
+ return endPointSnitch_.isInSameDataCenter(FBUtilities.getLocalAddress(), endpoint);
}
/*
@@ -326,7 +307,7 @@
* sure that the N replicas are in sync. We do this in the
* background when we do not care much about consistency.
*/
- public void doConsistencyCheck(Row row, List<EndPoint> endpoints, ReadCommand command)
+ public void doConsistencyCheck(Row row, List<InetAddress> endpoints, ReadCommand command)
{
Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, command);
consistencyManager_.submit(consistencySentinel);
@@ -335,11 +316,11 @@
public Map<Range, List<String>> getRangeToEndPointMap()
{
/* Get the token to endpoint map. */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* All the ranges for the tokens */
Range[] ranges = getAllRanges(tokenToEndPointMap.keySet());
Map<Range, List<String>> map = new HashMap<Range, List<String>>();
- for (Map.Entry<Range,List<EndPoint>> entry : constructRangeToEndPointMap(ranges).entrySet())
+ for (Map.Entry<Range,List<InetAddress>> entry : constructRangeToEndPointMap(ranges).entrySet())
{
map.put(entry.getKey(), stringify(entry.getValue()));
}
@@ -352,14 +333,14 @@
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
- public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges)
+ public Map<Range, List<InetAddress>> constructRangeToEndPointMap(Range[] ranges)
{
- Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
+ Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range, List<InetAddress>>();
for (Range range : ranges)
{
- EndPoint[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right());
+ InetAddress[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right());
// create a new ArrayList since a bunch of methods like to mutate the endpointmap List
- rangeToEndPointMap.put(range, new ArrayList<EndPoint>(Arrays.asList(endpoints)));
+ rangeToEndPointMap.put(range, new ArrayList<InetAddress>(Arrays.asList(endpoints)));
}
return rangeToEndPointMap;
}
@@ -371,15 +352,15 @@
* @param tokenToEndPointMap mapping of token to endpoints.
* @return mapping of ranges to the replicas responsible for them.
*/
- public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<Token, EndPoint> tokenToEndPointMap)
+ public Map<Range, List<InetAddress>> constructRangeToEndPointMap(Range[] ranges, Map<Token, InetAddress> tokenToEndPointMap)
{
if (logger_.isDebugEnabled())
logger_.debug("Constructing range to endpoint map ...");
- Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
+ Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range, List<InetAddress>>();
for ( Range range : ranges )
{
- EndPoint[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right(), tokenToEndPointMap);
- rangeToEndPointMap.put(range, new ArrayList<EndPoint>( Arrays.asList(endpoints) ) );
+ InetAddress[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right(), tokenToEndPointMap);
+ rangeToEndPointMap.put(range, new ArrayList<InetAddress>( Arrays.asList(endpoints) ) );
}
if (logger_.isDebugEnabled())
logger_.debug("Done constructing range to endpoint map ...");
@@ -391,9 +372,8 @@
* we are interested in new tokens as a result of a new node or an
* existing node moving to a new location on the ring.
*/
- public void onChange(EndPoint endpoint, EndPointState epState)
+ public void onChange(InetAddress endpoint, EndPointState epState)
{
- EndPoint ep = new EndPoint(endpoint.getHost(), DatabaseDescriptor.getStoragePort());
/* node identifier for this endpoint on the identifier space */
ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
/* Check if this has a bootstrapping state message */
@@ -401,14 +381,14 @@
if (bootstrapState)
{
if (logger_.isDebugEnabled())
- logger_.debug(ep + " is in bootstrap state.");
+ logger_.debug(endpoint + " is in bootstrap state.");
}
if (nodeIdState != null)
{
Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
if (logger_.isDebugEnabled())
logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
- Token oldToken = tokenMetadata_.getToken(ep);
+ Token oldToken = tokenMetadata_.getToken(endpoint);
if ( oldToken != null )
{
@@ -421,8 +401,8 @@
if ( !oldToken.equals(newToken) )
{
if (logger_.isDebugEnabled())
- logger_.debug("Relocation for endpoint " + ep);
- updateTokenMetadata(newToken, ep, bootstrapState);
+ logger_.debug("Relocation for endpoint " + endpoint);
+ updateTokenMetadata(newToken, endpoint, bootstrapState);
}
else
{
@@ -431,7 +411,7 @@
* Deliver the hints that we have for this endpoint.
*/
if (logger_.isDebugEnabled())
- logger_.debug("Sending hinted data to " + ep);
+ logger_.debug("Sending hinted data to " + endpoint);
deliverHints(endpoint);
}
}
@@ -440,7 +420,7 @@
/*
* This is a new node and we just update the token map.
*/
- updateTokenMetadata(newToken, ep, bootstrapState);
+ updateTokenMetadata(newToken, endpoint, bootstrapState);
}
}
else
@@ -452,8 +432,8 @@
if ( epState.isAlive() && tokenMetadata_.isKnownEndPoint(endpoint) )
{
if (logger_.isDebugEnabled())
- logger_.debug("EndPoint " + ep + " just recovered from a partition. Sending hinted data.");
- deliverHints(ep);
+ logger_.debug("InetAddress " + endpoint + " just recovered from a partition. Sending hinted data.");
+ deliverHints(endpoint);
}
}
}
@@ -472,15 +452,15 @@
public Map<String, String> getLoadMap()
{
Map<String, String> map = new HashMap<String, String>();
- for (Map.Entry<EndPoint,Double> entry : StorageLoadBalancer.instance().getLoadInfo().entrySet())
+ for (Map.Entry<InetAddress,Double> entry : StorageLoadBalancer.instance().getLoadInfo().entrySet())
{
- map.put(entry.getKey().getHost(), FileUtils.stringifyFileSize(entry.getValue()));
+ map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
}
// gossiper doesn't bother sending to itself, so if there are no other nodes around
// we need to cheat to get load information for the local node
- if (!map.containsKey(getLocalControlEndPoint().getHost()))
+ if (!map.containsKey(FBUtilities.getLocalAddress().getHostAddress()))
{
- map.put(getLocalControlEndPoint().getHost(), getLoadString());
+ map.put(FBUtilities.getLocalAddress().getHostAddress(), getLoadString());
}
return map;
}
@@ -496,7 +476,7 @@
/* update the token on disk */
SystemTable.updateToken(token);
/* Update the token maps */
- tokenMetadata_.update(token, StorageService.tcpAddr_);
+ tokenMetadata_.update(token, FBUtilities.getLocalAddress());
/* Gossip this new token for the local storage instance */
ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(token));
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
@@ -509,7 +489,7 @@
* @param endpoint remove the token state associated with this
* endpoint.
*/
- public void removeTokenState(EndPoint endpoint)
+ public void removeTokenState(InetAddress endpoint)
{
tokenMetadata_.remove(endpoint);
/* Remove the state from the Gossiper */
@@ -520,14 +500,14 @@
* Deliver hints to the specified node when it has crashed
* and come back up/ marked as alive after a network partition
*/
- public final void deliverHints(EndPoint endpoint)
+ public final void deliverHints(InetAddress endpoint)
{
HintedHandOffManager.instance().deliverHints(endpoint);
}
public Token getLocalToken()
{
- return tokenMetadata_.getToken(tcpAddr_);
+ return tokenMetadata_.getToken(FBUtilities.getLocalAddress());
}
/* This methods belong to the MBean interface */
@@ -547,29 +527,29 @@
return stringify(Gossiper.instance().getUnreachableMembers());
}
- private Set<String> stringify(Set<EndPoint> endPoints)
+ private Set<String> stringify(Set<InetAddress> endPoints)
{
Set<String> stringEndPoints = new HashSet<String>();
- for (EndPoint ep : endPoints)
+ for (InetAddress ep : endPoints)
{
- stringEndPoints.add(ep.getHost());
+ stringEndPoints.add(ep.getHostAddress());
}
return stringEndPoints;
}
- private List<String> stringify(List<EndPoint> endPoints)
+ private List<String> stringify(List<InetAddress> endPoints)
{
List<String> stringEndPoints = new ArrayList<String>();
- for (EndPoint ep : endPoints)
+ for (InetAddress ep : endPoints)
{
- stringEndPoints.add(ep.getHost());
+ stringEndPoints.add(ep.getHostAddress());
}
return stringEndPoints;
}
public int getCurrentGenerationNumber()
{
- return Gossiper.instance().getCurrentGenerationNumber(udpAddr_);
+ return Gossiper.instance().getCurrentGenerationNumber(FBUtilities.getLocalAddress());
}
public void forceTableCleanup() throws IOException
@@ -591,51 +571,6 @@
table.forceCompaction();
}
}
-
- public void forceHandoff(List<String> dataDirectories, String host) throws IOException
- {
- List<File> filesList = new ArrayList<File>();
- List<StreamContextManager.StreamContext> streamContexts = new ArrayList<StreamContextManager.StreamContext>();
-
- for (String dataDir : dataDirectories)
- {
- File directory = new File(dataDir);
- Collections.addAll(filesList, directory.listFiles());
-
-
- for (File tableDir : directory.listFiles())
- {
- String tableName = tableDir.getName();
-
- for (File file : tableDir.listFiles())
- {
- streamContexts.add(new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), tableName));
- if (logger_.isDebugEnabled())
- logger_.debug("Stream context metadata " + streamContexts);
- }
- }
- }
-
- if ( streamContexts.size() > 0 )
- {
- EndPoint target = new EndPoint(host, DatabaseDescriptor.getStoragePort());
- /* Set up the stream manager with the files that need to streamed */
- final StreamContextManager.StreamContext[] contexts = streamContexts.toArray(new StreamContextManager.StreamContext[streamContexts.size()]);
- StreamManager.instance(target).addFilesToStream(contexts);
- /* Send the bootstrap initiate message */
- final StreamContextManager.StreamContext[] bootContexts = streamContexts.toArray(new StreamContextManager.StreamContext[streamContexts.size()]);
- BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(bootContexts);
- Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
- MessagingService.instance().sendOneWay(message, target);
- if (logger_.isDebugEnabled())
- logger_.debug("Waiting for transfer to " + target + " to complete");
- StreamManager.instance(target).waitForStreamCompletion();
- if (logger_.isDebugEnabled())
- logger_.debug("Done with transfer to " + target);
- }
- }
/**
* Takes the snapshot for a given table.
@@ -705,7 +640,7 @@
* This method returns the predecessor of the endpoint ep on the identifier
* space.
*/
- EndPoint getPredecessor(EndPoint ep)
+ InetAddress getPredecessor(InetAddress ep)
{
Token token = tokenMetadata_.getToken(ep);
return tokenMetadata_.getEndPoint(replicationStrategy_.getPredecessor(token, tokenMetadata_.cloneTokenEndPointMap()));
@@ -715,7 +650,7 @@
* This method returns the successor of the endpoint ep on the identifier
* space.
*/
- public EndPoint getSuccessor(EndPoint ep)
+ public InetAddress getSuccessor(InetAddress ep)
{
Token token = tokenMetadata_.getToken(ep);
return tokenMetadata_.getEndPoint(replicationStrategy_.getSuccessor(token, tokenMetadata_.cloneTokenEndPointMap()));
@@ -726,7 +661,7 @@
* @param ep endpoint we are interested in.
* @return range for the specified endpoint.
*/
- public Range getPrimaryRangeForEndPoint(EndPoint ep)
+ public Range getPrimaryRangeForEndPoint(InetAddress ep)
{
Token right = tokenMetadata_.getToken(ep);
return replicationStrategy_.getPrimaryRangeFor(right, tokenMetadata_.cloneTokenEndPointMap());
@@ -737,7 +672,7 @@
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
- Set<Range> getRangesForEndPoint(EndPoint ep)
+ Set<Range> getRangesForEndPoint(InetAddress ep)
{
return replicationStrategy_.getRangeMap().get(ep);
}
@@ -771,11 +706,11 @@
* @param key - key for which we need to find the endpoint
* @return value - the endpoint responsible for this key
*/
- public EndPoint getPrimary(String key)
+ public InetAddress getPrimary(String key)
{
- EndPoint endpoint = StorageService.tcpAddr_;
+ InetAddress endpoint = FBUtilities.getLocalAddress();
Token token = partitioner_.getToken(key);
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
if (tokens.size() > 0)
{
@@ -809,8 +744,8 @@
*/
public boolean isPrimary(String key)
{
- EndPoint endpoint = getPrimary(key);
- return StorageService.tcpAddr_.equals(endpoint);
+ InetAddress endpoint = getPrimary(key);
+ return FBUtilities.getLocalAddress().equals(endpoint);
}
/**
@@ -820,7 +755,7 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public EndPoint[] getReadStorageEndPoints(String key)
+ public InetAddress[] getReadStorageEndPoints(String key)
{
return replicationStrategy_.getReadStorageEndPoints(partitioner_.getToken(key));
}
@@ -832,12 +767,12 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public List<EndPoint> getLiveReadStorageEndPoints(String key)
+ public List<InetAddress> getLiveReadStorageEndPoints(String key)
{
- List<EndPoint> liveEps = new ArrayList<EndPoint>();
- EndPoint[] endpoints = getReadStorageEndPoints(key);
+ List<InetAddress> liveEps = new ArrayList<InetAddress>();
+ InetAddress[] endpoints = getReadStorageEndPoints(key);
- for ( EndPoint endpoint : endpoints )
+ for ( InetAddress endpoint : endpoints )
{
if ( FailureDetector.instance().isAlive(endpoint) )
liveEps.add(endpoint);
@@ -853,7 +788,7 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public Map<EndPoint, EndPoint> getHintedStorageEndpointMap(String key, EndPoint[] naturalEndpoints)
+ public Map<InetAddress, InetAddress> getHintedStorageEndpointMap(String key, InetAddress[] naturalEndpoints)
{
return replicationStrategy_.getHintedStorageEndPoints(partitioner_.getToken(key), naturalEndpoints);
}
@@ -862,12 +797,12 @@
* This function finds the most suitable endpoint given a key.
* It checks for locality and alive test.
*/
- public EndPoint findSuitableEndPoint(String key) throws IOException, UnavailableException
+ public InetAddress findSuitableEndPoint(String key) throws IOException, UnavailableException
{
- EndPoint[] endpoints = getReadStorageEndPoints(key);
- for(EndPoint endPoint: endpoints)
+ InetAddress[] endpoints = getReadStorageEndPoints(key);
+ for(InetAddress endPoint: endpoints)
{
- if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+ if(endPoint.equals(FBUtilities.getLocalAddress()))
{
return endPoint;
}
@@ -889,7 +824,7 @@
if ( FailureDetector.instance().isAlive(endpoints[j]))
{
if (logger_.isDebugEnabled())
- logger_.debug("EndPoint " + endpoints[j] + " is alive so get data from it.");
+ logger_.debug("InetAddress " + endpoints[j] + " is alive so get data from it.");
return endpoints[j];
}
}
@@ -897,7 +832,7 @@
throw new UnavailableException(); // no nodes that could contain key are alive
}
- Map<Token, EndPoint> getLiveEndPointMap()
+ Map<Token, InetAddress> getLiveEndPointMap()
{
return tokenMetadata_.cloneTokenEndPointMap();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
import java.util.Set;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
public interface StorageServiceMBean
@@ -85,17 +85,6 @@
public void forceTableCleanup() throws IOException;
/**
- * Stream the files in the bootstrap directory over to the
- * node being bootstrapped. This is used in case of normal
- * bootstrap failure. Use a tool to re-calculate the cardinality
- * at a later point at the destination.
- * @param directories colon separated list of directories from where
- * files need to be picked up.
- * @param target endpoint receiving data.
- */
- public void forceHandoff(List<String> directories, String target) throws IOException;
-
- /**
* Takes the snapshot for a given table.
*
* @param tableName the name of the table.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Wed Oct 21 18:26:02 2009
@@ -22,14 +22,14 @@
import java.io.IOException;
import java.util.*;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import java.net.InetAddress;
+
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.log4j.Logger;
/*
@@ -40,9 +40,9 @@
{
private static Logger logger_ = Logger.getLogger( StreamManager.class );
- private static Map<EndPoint, StreamManager> streamManagers_ = new HashMap<EndPoint, StreamManager>();
+ private static Map<InetAddress, StreamManager> streamManagers_ = new HashMap<InetAddress, StreamManager>();
- public static StreamManager instance(EndPoint to)
+ public static StreamManager instance(InetAddress to)
{
StreamManager streamManager = streamManagers_.get(to);
if ( streamManager == null )
@@ -54,10 +54,10 @@
}
private List<File> filesToStream_ = new ArrayList<File>();
- private EndPoint to_;
+ private InetAddress to_;
private long totalBytesToStream_ = 0L;
- private StreamManager(EndPoint to)
+ private StreamManager(InetAddress to)
{
to_ = to;
}
@@ -80,7 +80,7 @@
File file = filesToStream_.get(0);
if (logger_.isDebugEnabled())
logger_.debug("Streaming file " + file + " ...");
- MessagingService.instance().stream(file.getAbsolutePath(), 0L, file.length(), StorageService.getLocalStorageEndPoint(), to_);
+ MessagingService.instance().stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to_);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
@@ -40,9 +40,9 @@
*/
private static boolean checkIfProcessKey(String key)
{
- EndPoint[] endPoints = StorageService.instance().getReadStorageEndPoints(key);
- EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
- for(EndPoint endPoint : endPoints)
+ InetAddress[] endPoints = StorageService.instance().getReadStorageEndPoints(key);
+ InetAddress localEndPoint = FBUtilities.getLocalAddress();
+ for(InetAddress endPoint : endPoints)
{
if(endPoint.equals(localEndPoint))
return true;
@@ -65,7 +65,7 @@
/* Sleep for proper discovery */
Thread.sleep(240000);
/* Create the file for the missing keys */
- RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getHostAddress() + ".dat", "rw");
+ RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getLocalAddress() + ".dat", "rw");
/* Start reading the file that contains the keys */
BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(args[0]) ), KeyChecker.bufSize_ );
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Oct 21 18:26:02 2009
@@ -45,7 +45,7 @@
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.CompactionManagerMBean;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java Wed Oct 21 18:26:02 2009
@@ -23,7 +23,9 @@
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SelectorManager;
@@ -61,20 +63,20 @@
port = Integer.valueOf(ipPortPair[1]);
}
- EndPoint target = new EndPoint(ipPortPair[0], port);
+ InetSocketAddress target = new InetSocketAddress(ipPortPair[0], port);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
Token.serializer().serialize(token, dos);
/* Construct the token update message to be sent */
- Message tokenUpdateMessage = new Message(new EndPoint(FBUtilities.getHostAddress(), port_),
+ Message tokenUpdateMessage = new Message(target.getAddress(),
"",
StorageService.tokenVerbHandler_,
bos.toByteArray());
System.out.println("Sending a token update message to " + target);
- MessagingService.instance().sendOneWay(tokenUpdateMessage, target);
+ MessagingService.instance().sendOneWay(tokenUpdateMessage, target.getAddress());
Thread.sleep(TokenUpdater.waitTime_);
System.out.println("Done sending the update message");
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Oct 21 18:26:02 2009
@@ -23,8 +23,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.MessageDigest;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.*;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
@@ -51,23 +49,22 @@
return result.toArray( new String[0] );
}
- public static InetAddress getLocalAddress() throws UnknownHostException
+ public static InetAddress getLocalAddress()
{
- if ( localInetAddress_ == null )
- localInetAddress_ = InetAddress.getLocalHost();
+ if (localInetAddress_ == null)
+ try
+ {
+ localInetAddress_ = DatabaseDescriptor.getListenAddress() == null
+ ? InetAddress.getLocalHost()
+ : DatabaseDescriptor.getListenAddress();
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
return localInetAddress_;
}
- public static String getHostAddress() throws UnknownHostException
- {
- InetAddress inetAddr = getLocalAddress();
- if (DatabaseDescriptor.getListenAddress() != null)
- {
- inetAddr = InetAddress.getByName(DatabaseDescriptor.getListenAddress());
- }
- return inetAddr.getHostAddress();
- }
-
public static byte[] toByteArray(int i)
{
byte[] bytes = new byte[4];
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java Wed Oct 21 18:26:02 2009
@@ -17,10 +17,11 @@
*/
package org.apache.cassandra.client;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.service.Cassandra;
import org.apache.cassandra.service.Column;
import org.apache.cassandra.service.ColumnPath;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
@@ -64,14 +65,14 @@
String row = "row" + nRows;
ColumnPath col = new ColumnPath("Standard1", null, "col1".getBytes());
- EndPoint endPoints[] = ringCache.getEndPoint(row);
+ InetAddress endPoints[] = ringCache.getEndPoint(row);
String hosts="";
for (int i=0; i<endPoints.length; i++)
hosts = hosts + ((i>0) ? "," : "") + endPoints[i];
System.out.println("hosts with key " + row + " : " + hosts + "; choose " + endPoints[0]);
// now, read the row back directly from the host owning the row locally
- setup(endPoints[0].getHost(), endPoints[0].getPort());
+ setup(endPoints[0].getHostAddress(), DatabaseDescriptor.getThriftPort());
thriftClient.insert(table, row, col, "val1".getBytes(), 1, 1);
Column column=thriftClient.get(table, row, col, 1).column;
System.out.println("read row " + row + " " + new String(column.name) + ":" + new String(column.value) + ":" + column.timestamp);
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Wed Oct 21 18:26:02 2009
@@ -31,7 +31,7 @@
import static junit.framework.Assert.assertEquals;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
@@ -123,7 +123,7 @@
Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz"));
ranges.add(r);
- List<SSTableReader> fileList = store.forceAntiCompaction(ranges, new EndPoint("127.0.0.1", 9150));
+ List<SSTableReader> fileList = store.forceAntiCompaction(ranges, InetAddress.getByName("127.0.0.1"));
assert fileList.size() >= 1;
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Wed Oct 21 18:26:02 2009
@@ -23,26 +23,28 @@
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.apache.cassandra.service.StorageService;
import org.junit.Test;
public class BootStrapperTest {
@Test
- public void testSourceTargetComputation()
+ public void testSourceTargetComputation() throws UnknownHostException
{
int numOldNodes = 3;
IPartitioner p = generateOldTokens(numOldNodes);
Token newToken = p.getDefaultToken();
- EndPoint newEndPoint = new EndPoint("1.2.3.10",100);
+ InetAddress newEndPoint = InetAddress.getByName("1.2.3.10");
/* New token needs to be part of the map for the algorithm
* to calculate the ranges correctly
*/
StorageService.instance().updateTokenMetadataUnsafe(newToken, newEndPoint);
- BootStrapper b = new BootStrapper(new EndPoint[]{newEndPoint}, newToken );
+ BootStrapper b = new BootStrapper(new InetAddress[]{newEndPoint}, newToken );
Map<Range,List<BootstrapSourceTarget>> res = b.getRangesWithSourceTarget();
int transferCount = 0;
@@ -55,23 +57,23 @@
}
/* Only 1 transfer from old node to new node */
assertEquals(1, transferCount);
- Map<EndPoint, Map<EndPoint,List<Range>>> temp = LeaveJoinProtocolHelper.getWorkMap(res);
+ Map<InetAddress, Map<InetAddress,List<Range>>> temp = LeaveJoinProtocolHelper.getWorkMap(res);
assertEquals(1, temp.keySet().size());
assertEquals(1, temp.entrySet().size());
- Map<EndPoint,Map<EndPoint,List<Range>>> res2 = LeaveJoinProtocolHelper.filterRangesForTargetEndPoint(temp, newEndPoint);
+ Map<InetAddress,Map<InetAddress,List<Range>>> res2 = LeaveJoinProtocolHelper.filterRangesForTargetEndPoint(temp, newEndPoint);
/* After filtering, still only 1 transfer */
assertEquals(1, res2.keySet().size());
assertEquals(1, res2.entrySet().size());
- assertTrue(((Map<EndPoint,List<Range>>)res2.values().toArray()[0]).containsKey(newEndPoint));
+ assertTrue(((Map<InetAddress,List<Range>>)res2.values().toArray()[0]).containsKey(newEndPoint));
}
- private IPartitioner generateOldTokens(int numOldNodes)
+ private IPartitioner generateOldTokens(int numOldNodes) throws UnknownHostException
{
IPartitioner p = new RandomPartitioner();
for (int i = 0 ; i< numOldNodes; i++)
{
- EndPoint e = new EndPoint("127.0.0."+i, 100);
+ InetAddress e = InetAddress.getByName("127.0.0." + i);
Token t = p.getDefaultToken();
StorageService.instance().updateTokenMetadataUnsafe(t, e);
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.io.SSTableReader;
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java Wed Oct 21 18:26:02 2009
@@ -27,7 +27,7 @@
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.junit.Test;
public class GossipDigestTest
@@ -36,7 +36,7 @@
@Test
public void test() throws IOException
{
- EndPoint endPoint = new EndPoint("127.0.0.1", 3333);
+ InetAddress endPoint = InetAddress.getByName("127.0.0.1");
int generation = 0;
int maxVersion = 123;
GossipDigest expected = new GossipDigest(endPoint, generation, maxVersion);
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Wed Oct 21 18:26:02 2009
@@ -31,12 +31,13 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.dht.StringToken;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
public class RackUnawareStrategyTest
{
@Test
- public void testBigIntegerStorageEndPoints()
+ public void testBigIntegerStorageEndPoints() throws UnknownHostException
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new RandomPartitioner();
@@ -52,7 +53,7 @@
}
@Test
- public void testStringStorageEndPoints()
+ public void testStringStorageEndPoints() throws UnknownHostException
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new OrderPreservingPartitioner();
@@ -69,19 +70,19 @@
// given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens,
// make sure that the Strategy picks the right endpoints for the keys.
- private void testGetStorageEndPoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens)
+ private void testGetStorageEndPoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens) throws UnknownHostException
{
- List<EndPoint> hosts = new ArrayList<EndPoint>();
+ List<InetAddress> hosts = new ArrayList<InetAddress>();
for (int i = 0; i < endPointTokens.length; i++)
{
- EndPoint ep = new EndPoint("127.0.0." + String.valueOf(i + 1), 7001);
+ InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
tmd.update(endPointTokens[i], ep);
hosts.add(ep);
}
for (int i = 0; i < keyTokens.length; i++)
{
- EndPoint[] endPoints = strategy.getReadStorageEndPoints(keyTokens[i]);
+ InetAddress[] endPoints = strategy.getReadStorageEndPoints(keyTokens[i]);
assertEquals(3, endPoints.length);
for (int j = 0; j < endPoints.length; j++)
{
@@ -91,7 +92,7 @@
}
@Test
- public void testGetStorageEndPointsDuringBootstrap()
+ public void testGetStorageEndPointsDuringBootstrap() throws UnknownHostException
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new RandomPartitioner();
@@ -106,24 +107,24 @@
keyTokens[i] = new BigIntegerToken(String.valueOf(10 * i + 5));
}
- List<EndPoint> hosts = new ArrayList<EndPoint>();
+ List<InetAddress> hosts = new ArrayList<InetAddress>();
for (int i = 0; i < endPointTokens.length; i++)
{
- EndPoint ep = new EndPoint("127.0.0." + String.valueOf(i + 1), 7001);
+ InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
tmd.update(endPointTokens[i], ep);
hosts.add(ep);
}
//Add bootstrap node id=6
Token bsToken = new BigIntegerToken(String.valueOf(25));
- EndPoint bootstrapEndPoint = new EndPoint("127.0.0.6", 7001);
+ InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
tmd.update(bsToken, bootstrapEndPoint, true);
for (int i = 0; i < keyTokens.length; i++)
{
- EndPoint[] endPoints = strategy.getWriteStorageEndPoints(keyTokens[i], strategy.getReadStorageEndPoints(keyTokens[i]));
+ InetAddress[] endPoints = strategy.getWriteStorageEndPoints(keyTokens[i], strategy.getReadStorageEndPoints(keyTokens[i]));
assertTrue(endPoints.length >=3);
- List<EndPoint> endPointsList = Arrays.asList(endPoints);
+ List<InetAddress> endPointsList = Arrays.asList(endPoints);
for (int j = 0; j < 3; j++)
{
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java Wed Oct 21 18:26:02 2009
@@ -1,6 +1,7 @@
package org.apache.cassandra.net;
import java.net.UnknownHostException;
+import java.net.InetAddress;
import org.junit.Test;
@@ -11,10 +12,10 @@
@Test
public void testSerialize() throws UnknownHostException
{
- EndPoint ep = new EndPoint(FBUtilities.getHostAddress(), 7000);
+ InetAddress ep = FBUtilities.getLocalAddress();
byte[] bytes = ep.getAddress();
System.out.println(bytes.length);
- EndPoint ep2 = EndPoint.getByAddress(bytes);
+ InetAddress ep2 = InetAddress.getByAddress(bytes);
System.out.println(ep2);
}
}