You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/21 20:26:04 UTC
svn commit: r828130 [2/3] - in /incubator/cassandra/trunk:
contrib/bmt_example/
contrib/property_snitch/src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java Wed Oct 21 18:26:02 2009
@@ -23,7 +23,7 @@
import java.io.IOException;
import java.util.*;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.*;
@@ -40,19 +40,19 @@
serializer_ = new GossipDigestAck2MessageSerializer();
}
- Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+ Map<InetAddress, EndPointState> epStateMap_ = new HashMap<InetAddress, EndPointState>();
public static ICompactSerializer<GossipDigestAck2Message> serializer()
{
return serializer_;
}
- GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
+ GossipDigestAck2Message(Map<InetAddress, EndPointState> epStateMap)
{
epStateMap_ = epStateMap;
}
- Map<EndPoint, EndPointState> getEndPointStateMap()
+ Map<InetAddress, EndPointState> getEndPointStateMap()
{
return epStateMap_;
}
@@ -68,7 +68,7 @@
public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
{
- Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+ Map<InetAddress, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
return new GossipDigestAck2Message(epStateMap);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Wed Oct 21 18:26:02 2009
@@ -24,7 +24,7 @@
import java.util.*;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
@@ -42,20 +42,20 @@
}
List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
- Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+ Map<InetAddress, EndPointState> epStateMap_ = new HashMap<InetAddress, EndPointState>();
static ICompactSerializer<GossipDigestAckMessage> serializer()
{
return serializer_;
}
- GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
+ GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<InetAddress, EndPointState> epStateMap)
{
gDigestList_ = gDigestList;
epStateMap_ = epStateMap;
}
- void addGossipDigest(EndPoint ep, int generation, int version)
+ void addGossipDigest(InetAddress ep, int generation, int version)
{
gDigestList_.add( new GossipDigest(ep, generation, version) );
}
@@ -65,7 +65,7 @@
return gDigestList_;
}
- Map<EndPoint, EndPointState> getEndPointStateMap()
+ Map<InetAddress, EndPointState> getEndPointStateMap()
{
return epStateMap_;
}
@@ -87,7 +87,7 @@
public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
{
- Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+ Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
boolean bContinue = dis.readBoolean();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.utils.Log4jLogger;
import org.apache.log4j.Logger;
import org.apache.cassandra.utils.*;
@@ -115,15 +115,15 @@
{
private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
- static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+ static boolean serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException
{
boolean bVal = true;
int estimate = 0;
int size = epStateMap.size();
dos.writeInt(size);
- Set<EndPoint> eps = epStateMap.keySet();
- for( EndPoint ep : eps )
+ Set<InetAddress> eps = epStateMap.keySet();
+ for( InetAddress ep : eps )
{
if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
{
@@ -142,10 +142,10 @@
return bVal;
}
- static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
+ static Map<InetAddress, EndPointState> deserialize(DataInputStream dis) throws IOException
{
int size = dis.readInt();
- Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+ Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
for ( int i = 0; i < size; ++i )
{
@@ -155,7 +155,7 @@
break;
}
// int length = dis.readInt();
- EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
+ InetAddress ep = CompactEndPointSerializationHelper.deserialize(dis);
EndPointState epState = EndPointState.serializer().deserialize(dis);
epStateMap.put(ep, epState);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
import org.apache.cassandra.concurrent.SingleThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -109,7 +109,7 @@
}
private Timer gossipTimer_ = new Timer(false);
- private EndPoint localEndPoint_;
+ private InetAddress localEndPoint_;
private long aVeryLongTime_;
private Random random_ = new Random();
/* round robin index through live endpoint set */
@@ -119,16 +119,16 @@
private List<IEndPointStateChangeSubscriber> subscribers_ = new ArrayList<IEndPointStateChangeSubscriber>();
/* live member set */
- private Set<EndPoint> liveEndpoints_ = new HashSet<EndPoint>();
+ private Set<InetAddress> liveEndpoints_ = new HashSet<InetAddress>();
/* unreachable member set */
- private Set<EndPoint> unreachableEndpoints_ = new HashSet<EndPoint>();
+ private Set<InetAddress> unreachableEndpoints_ = new HashSet<InetAddress>();
/* initial seeds for joining the cluster */
- private Set<EndPoint> seeds_ = new HashSet<EndPoint>();
+ private Set<InetAddress> seeds_ = new HashSet<InetAddress>();
/* map where key is the endpoint and value is the state associated with the endpoint */
- Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
+ Map<InetAddress, EndPointState> endPointStateMap_ = new Hashtable<InetAddress, EndPointState>();
private Gossiper()
{
@@ -155,24 +155,24 @@
subscribers_.remove(subscriber);
}
- public Set<EndPoint> getAllMembers()
+ public Set<InetAddress> getAllMembers()
{
- Set<EndPoint> allMbrs = new HashSet<EndPoint>();
+ Set<InetAddress> allMbrs = new HashSet<InetAddress>();
allMbrs.addAll(getLiveMembers());
allMbrs.addAll(getUnreachableMembers());
return allMbrs;
}
- public Set<EndPoint> getLiveMembers()
+ public Set<InetAddress> getLiveMembers()
{
- Set<EndPoint> liveMbrs = new HashSet<EndPoint>(liveEndpoints_);
- liveMbrs.add( new EndPoint( localEndPoint_.getHost(), localEndPoint_.getPort() ) );
+ Set<InetAddress> liveMbrs = new HashSet<InetAddress>(liveEndpoints_);
+ liveMbrs.add(localEndPoint_);
return liveMbrs;
}
- public Set<EndPoint> getUnreachableMembers()
+ public Set<InetAddress> getUnreachableMembers()
{
- return new HashSet<EndPoint>(unreachableEndpoints_);
+ return new HashSet<InetAddress>(unreachableEndpoints_);
}
/**
@@ -181,7 +181,7 @@
*
* param@ ep the endpoint to be removed from membership.
*/
- public synchronized void removeFromMembership(EndPoint ep)
+ public synchronized void removeFromMembership(InetAddress ep)
{
endPointStateMap_.remove(ep);
liveEndpoints_.remove(ep);
@@ -195,7 +195,7 @@
* param @ endpoint end point that is convicted.
*/
- public void convict(EndPoint endpoint)
+ public void convict(InetAddress endpoint)
{
EndPointState epState = endPointStateMap_.get(endpoint);
if ( epState != null )
@@ -208,7 +208,7 @@
*/
if ( liveEndpoints_.contains(endpoint) )
{
- logger_.info("EndPoint " + endpoint + " is now dead.");
+ logger_.info("InetAddress " + endpoint + " is now dead.");
isAlive(endpoint, epState, false);
/* Notify an endpoint is dead to interested parties. */
@@ -226,12 +226,12 @@
*
* param @ endpoint end point that is suspected.
*/
- public void suspect(EndPoint endpoint)
+ public void suspect(InetAddress endpoint)
{
EndPointState epState = endPointStateMap_.get(endpoint);
if ( epState.isAlive() )
{
- logger_.info("EndPoint " + endpoint + " is now dead.");
+ logger_.info("InetAddress " + endpoint + " is now dead.");
isAlive(endpoint, epState, false);
/* Notify an endpoint is dead to interested parties. */
@@ -265,7 +265,7 @@
*
* @param endpoint endpoint to be removed from the current membership.
*/
- void evictFromMembership(EndPoint endpoint)
+ void evictFromMembership(InetAddress endpoint)
{
unreachableEndpoints_.remove(endpoint);
}
@@ -280,7 +280,7 @@
int maxVersion = getMaxEndPointStateVersion(epState);
gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
- for ( EndPoint liveEndPoint : liveEndpoints_ )
+ for ( InetAddress liveEndPoint : liveEndpoints_ )
{
epState = endPointStateMap_.get(liveEndPoint);
if ( epState != null )
@@ -311,9 +311,9 @@
int maxVersion = getMaxEndPointStateVersion(epState);
gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
- List<EndPoint> endpoints = new ArrayList<EndPoint>( liveEndpoints_ );
+ List<InetAddress> endpoints = new ArrayList<InetAddress>( liveEndpoints_ );
Collections.shuffle(endpoints, random_);
- for ( EndPoint liveEndPoint : endpoints )
+ for ( InetAddress liveEndPoint : endpoints )
{
epState = endPointStateMap_.get(liveEndPoint);
if ( epState != null )
@@ -339,7 +339,7 @@
logger_.trace("Gossip Digests are : " + sb.toString());
}
- public int getCurrentGenerationNumber(EndPoint endpoint)
+ public int getCurrentGenerationNumber(InetAddress endpoint)
{
return endPointStateMap_.get(endpoint).getHeartBeatState().getGeneration();
}
@@ -377,14 +377,14 @@
boolean sendGossipToLiveNode(Message message)
{
int size = liveEndpoints_.size();
- List<EndPoint> eps = new ArrayList<EndPoint>(liveEndpoints_);
+ List<InetAddress> eps = new ArrayList<InetAddress>(liveEndpoints_);
if ( rrIndex_ >= size )
{
rrIndex_ = -1;
}
- EndPoint to = eps.get(++rrIndex_);
+ InetAddress to = eps.get(++rrIndex_);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
MessagingService.instance().sendUdpOneWay(message, to);
@@ -398,13 +398,13 @@
* @param epSet a set of endpoint from which a random endpoint is chosen.
* @return true if the chosen endpoint is also a seed.
*/
- boolean sendGossip(Message message, Set<EndPoint> epSet)
+ boolean sendGossip(Message message, Set<InetAddress> epSet)
{
int size = epSet.size();
/* Generate a random number from 0 -> size */
- List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
+ List<InetAddress> liveEndPoints = new ArrayList<InetAddress>(epSet);
int index = (size == 1) ? 0 : random_.nextInt(size);
- EndPoint to = liveEndPoints.get(index);
+ InetAddress to = liveEndPoints.get(index);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
MessagingService.instance().sendUdpOneWay(message, to);
@@ -465,9 +465,9 @@
void doStatusCheck()
{
- Set<EndPoint> eps = endPointStateMap_.keySet();
+ Set<InetAddress> eps = endPointStateMap_.keySet();
- for ( EndPoint endpoint : eps )
+ for ( InetAddress endpoint : eps )
{
if ( endpoint.equals(localEndPoint_) )
continue;
@@ -485,12 +485,12 @@
}
}
- EndPointState getEndPointStateForEndPoint(EndPoint ep)
+ EndPointState getEndPointStateForEndPoint(InetAddress ep)
{
return endPointStateMap_.get(ep);
}
- synchronized EndPointState getStateForVersionBiggerThan(EndPoint forEndpoint, int version)
+ synchronized EndPointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
{
EndPointState epState = endPointStateMap_.get(forEndpoint);
EndPointState reqdEndPointState = null;
@@ -536,7 +536,7 @@
* when a new node coming up multicasts the JoinMessage. Here we need
* to add the endPoint to the list of live endpoints.
*/
- synchronized void join(EndPoint from)
+ synchronized void join(InetAddress from)
{
if ( !from.equals( localEndPoint_ ) )
{
@@ -580,11 +580,11 @@
}
}
- void notifyFailureDetector(Map<EndPoint, EndPointState> remoteEpStateMap)
+ void notifyFailureDetector(Map<InetAddress, EndPointState> remoteEpStateMap)
{
IFailureDetector fd = FailureDetector.instance();
- Set<EndPoint> endpoints = remoteEpStateMap.keySet();
- for ( EndPoint endpoint : endpoints )
+ Set<InetAddress> endpoints = remoteEpStateMap.keySet();
+ for ( InetAddress endpoint : endpoints )
{
EndPointState remoteEndPointState = remoteEpStateMap.get(endpoint);
EndPointState localEndPointState = endPointStateMap_.get(endpoint);
@@ -616,18 +616,18 @@
}
}
- void markAlive(EndPoint addr, EndPointState localState)
+ void markAlive(InetAddress addr, EndPointState localState)
{
if (logger_.isTraceEnabled())
logger_.trace("marking as alive " + addr);
if ( !localState.isAlive() )
{
isAlive(addr, localState, true);
- logger_.info("EndPoint " + addr + " is now UP");
+ logger_.info("InetAddress " + addr + " is now UP");
}
}
- private void handleNewJoin(EndPoint ep, EndPointState epState)
+ private void handleNewJoin(InetAddress ep, EndPointState epState)
{
logger_.info("Node " + ep + " has now joined.");
/* Mark this endpoint as "live" */
@@ -637,10 +637,10 @@
doNotifications(ep, epState);
}
- synchronized void applyStateLocally(Map<EndPoint, EndPointState> epStateMap)
+ synchronized void applyStateLocally(Map<InetAddress, EndPointState> epStateMap)
{
- Set<EndPoint> eps = epStateMap.keySet();
- for( EndPoint ep : eps )
+ Set<InetAddress> eps = epStateMap.keySet();
+ for( InetAddress ep : eps )
{
if ( ep.equals( localEndPoint_ ) )
continue;
@@ -681,7 +681,7 @@
}
}
- void applyHeartBeatStateLocally(EndPoint addr, EndPointState localState, EndPointState remoteState)
+ void applyHeartBeatStateLocally(InetAddress addr, EndPointState localState, EndPointState remoteState)
{
HeartBeatState localHbState = localState.getHeartBeatState();
HeartBeatState remoteHbState = remoteState.getHeartBeatState();
@@ -703,7 +703,7 @@
}
}
- void applyApplicationStateLocally(EndPoint addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
+ void applyApplicationStateLocally(InetAddress addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
{
Map<String, ApplicationState> localAppStateMap = localStatePtr.getApplicationState();
Map<String, ApplicationState> remoteAppStateMap = remoteStatePtr.getApplicationState();
@@ -757,7 +757,7 @@
}
}
- void doNotifications(EndPoint addr, EndPointState epState)
+ void doNotifications(InetAddress addr, EndPointState epState)
{
for ( IEndPointStateChangeSubscriber subscriber : subscribers_ )
{
@@ -765,7 +765,7 @@
}
}
- synchronized void isAlive(EndPoint addr, EndPointState epState, boolean value)
+ synchronized void isAlive(InetAddress addr, EndPointState epState, boolean value)
{
epState.isAlive(value);
if ( value )
@@ -784,9 +784,9 @@
}
/* These are helper methods used from GossipDigestSynVerbHandler */
- Map<EndPoint, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
+ Map<InetAddress, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
{
- Map<EndPoint, GossipDigest> epMap = new HashMap<EndPoint, GossipDigest>();
+ Map<InetAddress, GossipDigest> epMap = new HashMap<InetAddress, GossipDigest>();
for( GossipDigest gDigest : gDigestList )
{
epMap.put( gDigest.getEndPoint(), gDigest );
@@ -795,14 +795,14 @@
}
/* This is a helper method to get all EndPoints from a list of GossipDigests */
- EndPoint[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
+ InetAddress[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
{
- Set<EndPoint> set = new HashSet<EndPoint>();
+ Set<InetAddress> set = new HashSet<InetAddress>();
for ( GossipDigest gDigest : gDigestList )
{
set.add( gDigest.getEndPoint() );
}
- return set.toArray( new EndPoint[0] );
+ return set.toArray( new InetAddress[0] );
}
/* Request all the state for the endpoint in the gDigest */
@@ -813,7 +813,7 @@
}
/* Send all the data with version greater than maxRemoteVersion */
- void sendAll(GossipDigest gDigest, Map<EndPoint, EndPointState> deltaEpStateMap, int maxRemoteVersion)
+ void sendAll(GossipDigest gDigest, Map<InetAddress, EndPointState> deltaEpStateMap, int maxRemoteVersion)
{
EndPointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndPoint(), maxRemoteVersion) ;
if ( localEpStatePtr != null )
@@ -824,7 +824,7 @@
This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
and the delta state are built up.
*/
- synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<EndPoint, EndPointState> deltaEpStateMap)
+ synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndPointState> deltaEpStateMap)
{
for ( GossipDigest gDigest : gDigestList )
{
@@ -887,16 +887,14 @@
* Start the gossiper with the generation # retrieved from the System
* table
*/
- public void start(EndPoint localEndPoint, int generationNbr) throws IOException
+ public void start(InetAddress localEndPoint, int generationNbr) throws IOException
{
localEndPoint_ = localEndPoint;
/* Get the seeds from the config and initialize them. */
- Set<String> seedHosts = DatabaseDescriptor.getSeeds();
- for( String seedHost : seedHosts )
+ Set<InetAddress> seedHosts = DatabaseDescriptor.getSeeds();
+ for (InetAddress seed : seedHosts)
{
- EndPoint seed = new EndPoint(InetAddress.getByName(seedHost).getHostAddress(),
- DatabaseDescriptor.getControlPort());
- if ( seed.equals(localEndPoint) )
+ if (seed.equals(localEndPoint))
continue;
seeds_.add(seed);
}
@@ -943,7 +941,7 @@
public void doVerb(Message message)
{
- EndPoint from = message.getFrom();
+ InetAddress from = message.getFrom();
if (logger_.isDebugEnabled())
logger_.debug("Received a JoinMessage from " + from);
@@ -972,7 +970,7 @@
public void doVerb(Message message)
{
- EndPoint from = message.getFrom();
+ InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
logger_.trace("Received a GossipDigestSynMessage from " + from);
@@ -993,7 +991,7 @@
doSort(gDigestList);
List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
- Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+ Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
Gossiper.instance().examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
@@ -1011,14 +1009,14 @@
/*
* First construct a map whose key is the endpoint in the GossipDigest and the value is the
* GossipDigest itself. Then build a list of version differences i.e difference between the
- * version in the GossipDigest and the version in the local state for a given EndPoint.
+ * version in the GossipDigest and the version in the local state for a given InetAddress.
* Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
* to the endpoint from the map that was initially constructed.
*/
private void doSort(List<GossipDigest> gDigestList)
{
/* Construct a map of endpoint to GossipDigest. */
- Map<EndPoint, GossipDigest> epToDigestMap = new HashMap<EndPoint, GossipDigest>();
+ Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
for ( GossipDigest gDigest : gDigestList )
{
epToDigestMap.put(gDigest.getEndPoint(), gDigest);
@@ -1031,7 +1029,7 @@
List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
for ( GossipDigest gDigest : gDigestList )
{
- EndPoint ep = gDigest.getEndPoint();
+ InetAddress ep = gDigest.getEndPoint();
EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep);
int version = (epState != null) ? Gossiper.instance().getMaxEndPointStateVersion( epState ) : 0;
int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
@@ -1058,7 +1056,7 @@
public void doVerb(Message message)
{
- EndPoint from = message.getFrom();
+ InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
logger_.trace("Received a GossipDigestAckMessage from " + from);
@@ -1069,7 +1067,7 @@
{
GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
- Map<EndPoint, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+ Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
if ( epStateMap.size() > 0 )
{
@@ -1079,10 +1077,10 @@
}
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
- Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+ Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
for( GossipDigest gDigest : gDigestList )
{
- EndPoint addr = gDigest.getEndPoint();
+ InetAddress addr = gDigest.getEndPoint();
EndPointState localEpStatePtr = Gossiper.instance().getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
if ( localEpStatePtr != null )
deltaEpStateMap.put(addr, localEpStatePtr);
@@ -1107,7 +1105,7 @@
public void doVerb(Message message)
{
- EndPoint from = message.getFrom();
+ InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
logger_.trace("Received a GossipDigestAck2Message from " + from);
@@ -1122,7 +1120,7 @@
{
throw new RuntimeException(e);
}
- Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+ Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
/* Notify the Failure Detector */
Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
Gossiper.instance().applyStateLocally(remoteEpStateMap);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
package org.apache.cassandra.gms;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
/**
* This is called by an instance of the IEndPointStateChangePublisher to notify
@@ -38,5 +38,5 @@
* @param endpoint endpoint for which the state change occurred.
* @param epState state that actually changed for the above endpoint.
*/
- public void onChange(EndPoint endpoint, EndPointState epState);
+ public void onChange(InetAddress endpoint, EndPointState epState);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
package org.apache.cassandra.gms;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
/**
* Implemented by the Gossiper to either convict/suspect an endpoint
@@ -32,11 +32,11 @@
* Convict the specified endpoint.
* @param ep endpoint to be convicted
*/
- public void convict(EndPoint ep);
+ public void convict(InetAddress ep);
/**
* Suspect the specified endpoint.
* @param ep endpoint to be suspected.
*/
- public void suspect(EndPoint ep);
+ public void suspect(InetAddress ep);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
package org.apache.cassandra.gms;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
/**
* An interface that provides an application with the ability
@@ -36,7 +36,7 @@
* @param ep endpoint in question.
* @return true if UP and false if DOWN.
*/
- public boolean isAlive(EndPoint ep);
+ public boolean isAlive(InetAddress ep);
/**
* This method is invoked by any entity wanting to interrogate the status of an endpoint.
@@ -45,7 +45,7 @@
*
* param ep endpoint for which we interpret the inter arrival times.
*/
- public void interpret(EndPoint ep);
+ public void interpret(InetAddress ep);
/**
* This method is invoked by the receiver of the heartbeat. In our case it would be
@@ -54,7 +54,7 @@
*
* param ep endpoint being reported.
*/
- public void report(EndPoint ep);
+ public void report(InetAddress ep);
/**
* Register interest for Failure Detector events.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java Wed Oct 21 18:26:02 2009
@@ -18,10 +18,10 @@
package org.apache.cassandra.gms;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
public interface IFailureNotification
{
- public void suspect(EndPoint ep);
- public void revive(EndPoint ep);
+ public void suspect(InetAddress ep);
+ public void revive(InetAddress ep);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Oct 21 18:26:02 2009
@@ -26,8 +26,9 @@
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
+import java.net.InetAddress;
+
+import org.apache.cassandra.utils.FBUtilities;
/**
* This class contains a helper method that will be used by
@@ -51,9 +52,9 @@
storagePort_ = storagePort;
}
- public abstract EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
+ public abstract InetAddress[] getReadStorageEndPoints(Token token, Map<Token, InetAddress> tokenToEndPointMap);
- public EndPoint[] getReadStorageEndPoints(Token token)
+ public InetAddress[] getReadStorageEndPoints(Token token)
{
return getReadStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
}
@@ -63,7 +64,7 @@
* on which the data is being placed and the value is the
* endpoint to which it should be forwarded.
*/
- public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token, EndPoint[] naturalEndpoints)
+ public Map<InetAddress, InetAddress> getHintedStorageEndPoints(Token token, InetAddress[] naturalEndpoints)
{
return getHintedMapForEndpoints(getWriteStorageEndPoints(token, naturalEndpoints));
}
@@ -76,14 +77,14 @@
*
* Only ReplicationStrategy should care about this method (higher level users should only ask for Hinted).
*/
- protected EndPoint[] getWriteStorageEndPoints(Token token, EndPoint[] naturalEndpoints)
+ public InetAddress[] getWriteStorageEndPoints(Token token, InetAddress[] naturalEndpoints)
{
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- Map<Token, EndPoint> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
- ArrayList<EndPoint> list = new ArrayList<EndPoint>(Arrays.asList(naturalEndpoints));
+ Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, InetAddress> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
+ ArrayList<InetAddress> list = new ArrayList<InetAddress>(Arrays.asList(getReadStorageEndPoints(token, tokenToEndPointMap)));
for (Token t : bootstrapTokensToEndpointMap.keySet())
{
- EndPoint ep = bootstrapTokensToEndpointMap.get(t);
+ InetAddress ep = bootstrapTokensToEndpointMap.get(t);
tokenToEndPointMap.put(t, ep);
try
{
@@ -101,29 +102,15 @@
tokenToEndPointMap.remove(t);
}
}
- return retrofitPorts(list).toArray(new EndPoint[list.size()]);
- }
-
- /*
- * This method changes the ports of the endpoints from
- * the control port to the storage ports.
- */
- public List<EndPoint> retrofitPorts(List<EndPoint> eps)
- {
- List<EndPoint> retrofitted = new ArrayList<EndPoint>();
- for ( EndPoint ep : eps )
- {
- retrofitted.add(new EndPoint(ep.getHost(), ep.getPort()));
- }
- return retrofitted;
+ return list.toArray(new InetAddress[list.size()]);
}
- private Map<EndPoint, EndPoint> getHintedMapForEndpoints(EndPoint[] topN)
+ private Map<InetAddress, InetAddress> getHintedMapForEndpoints(InetAddress[] topN)
{
- Set<EndPoint> usedEndpoints = new HashSet<EndPoint>();
- Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
+ Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
+ Map<InetAddress, InetAddress> map = new HashMap<InetAddress, InetAddress>();
- for (EndPoint ep : topN)
+ for (InetAddress ep : topN)
{
if (FailureDetector.instance().isAlive(ep))
{
@@ -133,8 +120,8 @@
else
{
// find another endpoint to store a hint on. prefer endpoints that aren't already in use
- EndPoint hintLocation = null;
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ InetAddress hintLocation = null;
+ Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
List tokens = new ArrayList(tokenToEndPointMap.keySet());
Collections.sort(tokens);
Token token = tokenMetadata_.getToken(ep);
@@ -149,7 +136,7 @@
int startIndex = (index + 1) % totalNodes;
for (int i = startIndex, count = 1; count < totalNodes; ++count, i = (i + 1) % totalNodes)
{
- EndPoint tmpEndPoint = tokenToEndPointMap.get(tokens.get(i));
+ InetAddress tmpEndPoint = tokenToEndPointMap.get(tokens.get(i));
if (FailureDetector.instance().isAlive(tmpEndPoint) && !Arrays.asList(topN).contains(tmpEndPoint) && !usedEndpoints.contains(tmpEndPoint))
{
hintLocation = tmpEndPoint;
@@ -158,7 +145,7 @@
}
// if all endpoints are already in use, might as well store it locally to save the network trip
if (hintLocation == null)
- hintLocation = StorageService.getLocalControlEndPoint();
+ hintLocation = FBUtilities.getLocalAddress();
map.put(hintLocation, ep);
usedEndpoints.add(hintLocation);
@@ -169,11 +156,11 @@
// TODO this is pretty inefficient.
// fixing this probably requires merging tokenmetadata into replicationstrategy, so we can cache/invalidate cleanly
- protected Map<EndPoint, Set<Range>> getRangeMap(Map<Token, EndPoint> tokenMap)
+ protected Map<InetAddress, Set<Range>> getRangeMap(Map<Token, InetAddress> tokenMap)
{
- Map<EndPoint, Set<Range>> map = new HashMap<EndPoint, Set<Range>>();
+ Map<InetAddress, Set<Range>> map = new HashMap<InetAddress, Set<Range>>();
- for (EndPoint ep : tokenMap.values())
+ for (InetAddress ep : tokenMap.values())
{
map.put(ep, new HashSet<Range>());
}
@@ -181,7 +168,7 @@
for (Token token : tokenMap.keySet())
{
Range range = getPrimaryRangeFor(token, tokenMap);
- for (EndPoint ep : getReadStorageEndPoints(token, tokenMap))
+ for (InetAddress ep : getReadStorageEndPoints(token, tokenMap))
{
map.get(ep).add(range);
}
@@ -190,17 +177,17 @@
return map;
}
- public Map<EndPoint, Set<Range>> getRangeMap()
+ public Map<InetAddress, Set<Range>> getRangeMap()
{
return getRangeMap(tokenMetadata_.cloneTokenEndPointMap());
}
- public Range getPrimaryRangeFor(Token right, Map<Token, EndPoint> tokenToEndPointMap)
+ public Range getPrimaryRangeFor(Token right, Map<Token, InetAddress> tokenToEndPointMap)
{
return new Range(getPredecessor(right, tokenToEndPointMap), right);
}
- public Token getPredecessor(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ public Token getPredecessor(Token token, Map<Token, InetAddress> tokenToEndPointMap)
{
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
@@ -208,7 +195,7 @@
return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(--index));
}
- public Token getSuccessor(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ public Token getSuccessor(Token token, Map<Token, InetAddress> tokenToEndPointMap)
{
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java Wed Oct 21 18:26:02 2009
@@ -20,11 +20,11 @@
import java.net.*;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
public class EndPointSnitch implements IEndPointSnitch
{
- public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException
+ public boolean isOnSameRack(InetAddress host, InetAddress host2) throws UnknownHostException
{
/*
* Look at the IP Address of the two hosts. Compare
@@ -37,7 +37,7 @@
return ( ip[2] == ip2[2] );
}
- public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException
+ public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException
{
/*
* Look at the IP Address of the two hosts. Compare
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java Wed Oct 21 18:26:02 2009
@@ -20,7 +20,7 @@
import java.net.UnknownHostException;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
/**
@@ -38,7 +38,7 @@
* @return true if on the same rack false otherwise
* @throws UnknownHostException
*/
- public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException;
+ public boolean isOnSameRack(InetAddress host, InetAddress host2) throws UnknownHostException;
/**
* Helps determine if 2 nodes are in the same data center.
@@ -47,5 +47,5 @@
* @return true if in the same data center false otherwise
* @throws UnknownHostException
*/
- public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException;
+ public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
@@ -44,10 +44,10 @@
super(tokenMetadata, partitioner, replicas, storagePort);
}
- public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ public InetAddress[] getReadStorageEndPoints(Token token, Map<Token, InetAddress> tokenToEndPointMap)
{
int startIndex;
- List<EndPoint> list = new ArrayList<EndPoint>();
+ List<InetAddress> list = new ArrayList<InetAddress>();
boolean bDataCenter = false;
boolean bOtherRack = false;
int foundCount = 0;
@@ -66,7 +66,7 @@
foundCount++;
if( replicas_ == 1 )
{
- return list.toArray(new EndPoint[list.size()]);
+ return list.toArray(new InetAddress[list.size()]);
}
startIndex = (index + 1)%totalNodes;
IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
@@ -117,6 +117,6 @@
foundCount++;
}
}
- return retrofitPorts(list).toArray(new EndPoint[list.size()]);
+ return list.toArray(new InetAddress[list.size()]);
}
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
/**
* This class returns the nodes responsible for a given
@@ -40,7 +40,7 @@
super(tokenMetadata, partitioner, replicas, storagePort);
}
- public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ public InetAddress[] getReadStorageEndPoints(Token token, Map<Token, InetAddress> tokenToEndPointMap)
{
int startIndex;
List<Token> tokenList = new ArrayList<Token>();
@@ -74,9 +74,9 @@
foundCount++;
}
}
- List<EndPoint> list = new ArrayList<EndPoint>();
+ List<InetAddress> list = new ArrayList<InetAddress>();
for (Token t: tokenList)
list.add(tokenToEndPointMap.get(t));
- return retrofitPorts(list).toArray(new EndPoint[list.size()]);
+ return list.toArray(new InetAddress[list.size()]);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Oct 21 18:26:02 2009
@@ -23,7 +23,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.service.UnavailableException;
@@ -31,23 +31,23 @@
public class TokenMetadata
{
/* Maintains token to endpoint map of every node in the cluster. */
- private Map<Token, EndPoint> tokenToEndPointMap_;
+ private Map<Token, InetAddress> tokenToEndPointMap_;
/* Maintains a reverse index of endpoint to token in the cluster. */
- private Map<EndPoint, Token> endPointToTokenMap_;
+ private Map<InetAddress, Token> endPointToTokenMap_;
/* Bootstrapping nodes and their tokens */
- private Map<Token, EndPoint> bootstrapNodes;
+ private Map<Token, InetAddress> bootstrapNodes;
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
public TokenMetadata()
{
- tokenToEndPointMap_ = new HashMap<Token, EndPoint>();
- endPointToTokenMap_ = new HashMap<EndPoint, Token>();
- this.bootstrapNodes = Collections.synchronizedMap(new HashMap<Token, EndPoint>());
+ tokenToEndPointMap_ = new HashMap<Token, InetAddress>();
+ endPointToTokenMap_ = new HashMap<InetAddress, Token>();
+ this.bootstrapNodes = Collections.synchronizedMap(new HashMap<Token, InetAddress>());
}
- public TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap, Map<Token, EndPoint> bootstrapNodes)
+ public TokenMetadata(Map<Token, InetAddress> tokenToEndPointMap, Map<InetAddress, Token> endPointToTokenMap, Map<Token, InetAddress> bootstrapNodes)
{
tokenToEndPointMap_ = tokenToEndPointMap;
endPointToTokenMap_ = endPointToTokenMap;
@@ -59,14 +59,14 @@
return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap(), cloneBootstrapNodes());
}
- public void update(Token token, EndPoint endpoint)
+ public void update(Token token, InetAddress endpoint)
{
this.update(token, endpoint, false);
}
/**
* Update the two maps in an safe mode.
*/
- public void update(Token token, EndPoint endpoint, boolean bootstrapState)
+ public void update(Token token, InetAddress endpoint, boolean bootstrapState)
{
lock_.writeLock().lock();
try
@@ -96,7 +96,7 @@
* Remove the entries in the two maps.
* @param endpoint
*/
- public void remove(EndPoint endpoint)
+ public void remove(InetAddress endpoint)
{
lock_.writeLock().lock();
try
@@ -112,7 +112,7 @@
}
}
- public Token getToken(EndPoint endpoint)
+ public Token getToken(InetAddress endpoint)
{
lock_.readLock().lock();
try
@@ -125,7 +125,7 @@
}
}
- public boolean isKnownEndPoint(EndPoint ep)
+ public boolean isKnownEndPoint(InetAddress ep)
{
lock_.readLock().lock();
try
@@ -138,7 +138,7 @@
}
}
- public EndPoint getFirstEndpoint()
+ public InetAddress getFirstEndpoint()
{
lock_.readLock().lock();
try
@@ -156,7 +156,7 @@
}
- public EndPoint getNextEndpoint(EndPoint endPoint) throws UnavailableException
+ public InetAddress getNextEndpoint(InetAddress endPoint) throws UnavailableException
{
lock_.readLock().lock();
try
@@ -167,7 +167,7 @@
Collections.sort(tokens);
int i = tokens.indexOf(endPointToTokenMap_.get(endPoint)); // TODO binary search
int j = 1;
- EndPoint ep;
+ InetAddress ep;
while (!FailureDetector.instance().isAlive((ep = tokenToEndPointMap_.get(tokens.get((i + j) % tokens.size())))))
{
if (++j > DatabaseDescriptor.getReplicationFactor())
@@ -183,12 +183,12 @@
}
}
- public Map<Token, EndPoint> cloneBootstrapNodes()
+ public Map<Token, InetAddress> cloneBootstrapNodes()
{
lock_.readLock().lock();
try
{
- return new HashMap<Token, EndPoint>( bootstrapNodes );
+ return new HashMap<Token, InetAddress>( bootstrapNodes );
}
finally
{
@@ -200,12 +200,12 @@
/*
* Returns a safe clone of tokenToEndPointMap_.
*/
- public Map<Token, EndPoint> cloneTokenEndPointMap()
+ public Map<Token, InetAddress> cloneTokenEndPointMap()
{
lock_.readLock().lock();
try
{
- return new HashMap<Token, EndPoint>( tokenToEndPointMap_ );
+ return new HashMap<Token, InetAddress>( tokenToEndPointMap_ );
}
finally
{
@@ -216,12 +216,12 @@
/*
* Returns a safe clone of endPointTokenMap_.
*/
- public Map<EndPoint, Token> cloneEndPointTokenMap()
+ public Map<InetAddress, Token> cloneEndPointTokenMap()
{
lock_.readLock().lock();
try
{
- return new HashMap<EndPoint, Token>( endPointToTokenMap_ );
+ return new HashMap<InetAddress, Token>( endPointToTokenMap_ );
}
finally
{
@@ -232,9 +232,9 @@
public String toString()
{
StringBuilder sb = new StringBuilder();
- Set<EndPoint> eps = endPointToTokenMap_.keySet();
+ Set<InetAddress> eps = endPointToTokenMap_.keySet();
- for ( EndPoint ep : eps )
+ for ( InetAddress ep : eps )
{
sb.append(ep);
sb.append(":");
@@ -245,7 +245,7 @@
return sb.toString();
}
- public EndPoint getEndPoint(Token token)
+ public InetAddress getEndPoint(Token token)
{
lock_.readLock().lock();
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java Wed Oct 21 18:26:02 2009
@@ -19,18 +19,19 @@
package org.apache.cassandra.net;
import java.io.*;
+import java.net.InetAddress;
public class CompactEndPointSerializationHelper
{
- public static void serialize(EndPoint endPoint, DataOutputStream dos) throws IOException
+ public static void serialize(InetAddress endPoint, DataOutputStream dos) throws IOException
{
dos.write(endPoint.getAddress());
}
- public static EndPoint deserialize(DataInputStream dis) throws IOException
+ public static InetAddress deserialize(DataInputStream dis) throws IOException
{
- byte[] bytes = new byte[6];
+ byte[] bytes = new byte[4];
dis.readFully(bytes, 0, bytes.length);
- return EndPoint.getByAddress(bytes);
+ return InetAddress.getByAddress(bytes);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Oct 21 18:26:02 2009
@@ -20,6 +20,7 @@
import java.io.*;
import java.net.SocketException;
+import java.net.InetAddress;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.utils.LogUtil;
@@ -32,10 +33,10 @@
private String file_;
private long startPosition_;
private long total_;
- private EndPoint from_;
- private EndPoint to_;
+ private InetAddress from_;
+ private InetAddress to_;
- FileStreamTask(String file, long startPosition, long total, EndPoint from, EndPoint to)
+ FileStreamTask(String file, long startPosition, long total, InetAddress from, InetAddress to)
{
file_ = file;
startPosition_ = startPosition;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Wed Oct 21 18:26:02 2009
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.net.InetAddress;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.utils.GuidGenerator;
@@ -43,13 +44,13 @@
return serializer_;
}
- private EndPoint from_;
+ private InetAddress from_;
private String type_;
private String verb_;
private String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
- Header(String id, EndPoint from, String messageType, String verb)
+ Header(String id, InetAddress from, String messageType, String verb)
{
messageId_ = id;
from_ = from;
@@ -57,13 +58,13 @@
verb_ = verb;
}
- Header(String id, EndPoint from, String messageType, String verb, Map<String, byte[]> details)
+ Header(String id, InetAddress from, String messageType, String verb, Map<String, byte[]> details)
{
this(id, from, messageType, verb);
details_ = details;
}
- Header(EndPoint from, String messageType, String verb)
+ Header(InetAddress from, String messageType, String verb)
{
messageId_ = Integer.toString(idGen_.incrementAndGet());
from_ = from;
@@ -71,7 +72,7 @@
verb_ = verb;
}
- EndPoint getFrom()
+ InetAddress getFrom()
{
return from_;
}
@@ -153,7 +154,7 @@
public Header deserialize(DataInputStream dis) throws IOException
{
String id = dis.readUTF();
- EndPoint from = CompactEndPointSerializationHelper.deserialize(dis);
+ InetAddress from = CompactEndPointSerializationHelper.deserialize(dis);
String type = dis.readUTF();
String verb = dis.readUTF();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java Wed Oct 21 18:26:02 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.net;
import java.io.IOException;
+import java.net.InetAddress;
import javax.xml.bind.JAXBException;
@@ -45,7 +46,7 @@
* Deregister all verbhandlers corresponding to localEndPoint.
* @param localEndPoint
*/
- public void deregisterAllVerbHandlers(EndPoint localEndPoint);
+ public void deregisterAllVerbHandlers(InetAddress localEndPoint);
/**
* Deregister a verbhandler corresponding to the verb from the
@@ -56,16 +57,16 @@
/**
* Listen on the specified port.
- * @param ep EndPoint whose port to listen on.
+ * @param ep InetAddress whose port to listen on.
* @param isHttp specify if the port is an Http port.
*/
- public void listen(EndPoint ep) throws IOException;
+ public void listen(InetAddress ep) throws IOException;
/**
* Listen on the specified port.
- * @param ep EndPoint whose port to listen on.
+ * @param ep InetAddress whose port to listen on.
*/
- public void listenUDP(EndPoint ep);
+ public void listenUDP(InetAddress ep);
/**
* Send a message to a given endpoint.
@@ -74,7 +75,7 @@
* @return an reference to an IAsyncResult which can be queried for the
* response
*/
- public IAsyncResult sendRR(Message message, EndPoint to);
+ public IAsyncResult sendRR(Message message, InetAddress to);
/**
* Send a message to the given set of endpoints and informs the MessagingService
@@ -85,7 +86,7 @@
* @param cb callback interface which is used to pass the responses
* @return an reference to message id used to match with the result
*/
- public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb);
+ public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb);
/**
* Send a message to a given endpoint. This method specifies a callback
@@ -97,7 +98,7 @@
* suggest that a timeout occurred to the invoker of the send().
* @return an reference to message id used to match with the result
*/
- public String sendRR(Message message, EndPoint to, IAsyncCallback cb);
+ public String sendRR(Message message, InetAddress to, IAsyncCallback cb);
/**
* Send a message to a given endpoint. The ith element in the <code>messages</code>
@@ -112,7 +113,7 @@
* suggest that a timeout occured to the invoker of the send().
* @return an reference to message id used to match with the result
*/
- public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+ public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb);
/**
* Send a message to a given endpoint. This method adheres to the fire and forget
@@ -120,7 +121,7 @@
* @param message messages to be sent.
* @param to endpoint to which the message needs to be sent
*/
- public void sendOneWay(Message message, EndPoint to);
+ public void sendOneWay(Message message, InetAddress to);
/**
* Send a message to a given endpoint. This method adheres to the fire and forget
@@ -128,7 +129,7 @@
* @param message messages to be sent.
* @param to endpoint to which the message needs to be sent
*/
- public void sendUdpOneWay(Message message, EndPoint to);
+ public void sendUdpOneWay(Message message, InetAddress to);
/**
* Stream a file from source to destination. This is highly optimized
@@ -138,7 +139,7 @@
* @param total number of bytes to stream
* @param to endpoint to which we need to stream the file.
*/
- public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to);
+ public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to);
/**
* This method returns the verb handler associated with the registered
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Oct 21 18:26:02 2009
@@ -22,6 +22,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
+import java.net.InetAddress;
import org.apache.cassandra.io.ICompactSerializer;
@@ -52,7 +53,7 @@
body_ = body;
}
- public Message(EndPoint from, String messageType, String verb, byte[] body)
+ public Message(InetAddress from, String messageType, String verb, byte[] body)
{
this(new Header(from, messageType, verb), body);
}
@@ -92,7 +93,7 @@
return body_;
}
- public EndPoint getFrom()
+ public InetAddress getFrom()
{
return header_.getFrom();
}
@@ -117,7 +118,7 @@
header_.setMessageId(id);
}
- public Message getReply(EndPoint from, byte[] args)
+ public Message getReply(InetAddress from, byte[] args)
{
Header header = new Header(getMessageId(), from, MessagingService.responseStage_, MessagingService.responseVerbHandler_);
return new Message(header, args);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Oct 21 18:26:02 2009
@@ -28,6 +28,8 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.net.SocketException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
@@ -63,10 +65,10 @@
private static ICachetable<String, IAsyncResult> taskCompletionMap_;
/* Manages the table of endpoints it is listening on */
- private static Set<EndPoint> endPoints_;
+ private static Set<InetAddress> endPoints_;
/* List of sockets we are listening on */
- private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
+ private static Map<InetAddress, SelectionKey> listenSockets_ = new HashMap<InetAddress, SelectionKey>();
/* Lookup table for registering message handlers based on the verb. */
private static Map<String, IVerbHandler> verbHandlers_;
@@ -128,7 +130,7 @@
reservedVerbs_.put(verbs.toString(), verbs.toString());
}
verbHandlers_ = new HashMap<String, IVerbHandler>();
- endPoints_ = new HashSet<EndPoint>();
+ endPoints_ = new HashSet<InetAddress>();
/*
* Leave callbacks in the cachetable long enough that any related messages will arrive
* before the callback is evicted from the table. The concurrency level is set at 128
@@ -180,11 +182,11 @@
return result;
}
- public void listen(EndPoint localEp) throws IOException
+ public void listen(InetAddress localEp) throws IOException
{
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
- ss.bind(localEp.getInetAddress());
+ ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
serverChannel.configureBlocking(false);
SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
@@ -194,14 +196,14 @@
listenSockets_.put(localEp, key);
}
- public void listenUDP(EndPoint localEp)
+ public void listenUDP(InetAddress localEp)
{
UdpConnection connection = new UdpConnection();
if (logger_.isDebugEnabled())
logger_.debug("Starting to listen on " + localEp);
try
{
- connection.init(localEp.getPort());
+ connection.init(localEp);
endPoints_.add(localEp);
}
catch ( IOException e )
@@ -210,7 +212,7 @@
}
}
- public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
+ public static TcpConnectionManager getConnectionPool(InetAddress from, InetAddress to)
{
String key = from + ":" + to;
TcpConnectionManager cp = poolTable_.get(key);
@@ -236,7 +238,7 @@
return cp;
}
- public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
+ public static TcpConnection getConnection(InetAddress from, InetAddress to) throws IOException
{
return getConnectionPool(from, to).getConnection();
}
@@ -255,7 +257,7 @@
verbHandlers_.put(type, verbHandler);
}
- public void deregisterAllVerbHandlers(EndPoint localEndPoint)
+ public void deregisterAllVerbHandlers(InetAddress localEndPoint)
{
Iterator keys = verbHandlers_.keySet().iterator();
String key = null;
@@ -283,7 +285,7 @@
return handler;
}
- public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
+ public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
callbackMap_.put(messageId, cb);
@@ -294,7 +296,7 @@
return messageId;
}
- public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
+ public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
callbackMap_.put(messageId, cb);
@@ -302,7 +304,7 @@
return messageId;
}
- public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
+ public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
{
if ( messages.length != to.length )
{
@@ -321,7 +323,7 @@
/*
Use this version for fire and forget style messaging.
*/
- public void sendOneWay(Message message, EndPoint to)
+ public void sendOneWay(Message message, InetAddress to)
{
// do local deliveries
if ( message.getFrom().equals(to) )
@@ -364,7 +366,7 @@
}
}
- public IAsyncResult sendRR(Message message, EndPoint to)
+ public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
taskCompletionMap_.put(message.getMessageId(), iar);
@@ -372,7 +374,7 @@
return iar;
}
- public void sendUdpOneWay(Message message, EndPoint to)
+ public void sendUdpOneWay(Message message, InetAddress to)
{
if (message.getFrom().equals(to)) {
MessagingService.receive(message);
@@ -397,7 +399,7 @@
}
}
- public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
+ public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
{
isStreaming_.set(true);
/* Streaming asynchronously on streamExector_ threads. */
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Wed Oct 21 18:26:02 2009
@@ -29,6 +29,8 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.io.FastSerializer;
@@ -51,8 +53,8 @@
private TcpReader tcpReader_;
private ReadWorkItem readWork_ = new ReadWorkItem();
private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
- private EndPoint localEp_;
- private EndPoint remoteEp_;
+ private InetAddress localEp_;
+ private InetAddress remoteEp_;
boolean inUse_ = false;
/*
@@ -67,7 +69,7 @@
private Condition condition_;
// used from getConnection - outgoing
- TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+ TcpConnection(TcpConnectionManager pool, InetAddress from, InetAddress to) throws IOException
{
socketChannel_ = SocketChannel.open();
socketChannel_.configureBlocking(false);
@@ -75,8 +77,8 @@
localEp_ = from;
remoteEp_ = to;
-
- if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+
+ if (!socketChannel_.connect(new InetSocketAddress(remoteEp_, DatabaseDescriptor.getStoragePort())))
{
key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
}
@@ -89,7 +91,7 @@
/*
* Used for streaming purposes has no pooling semantics.
*/
- TcpConnection(EndPoint from, EndPoint to) throws IOException
+ TcpConnection(InetAddress from, InetAddress to) throws IOException
{
socketChannel_ = SocketChannel.open();
socketChannel_.configureBlocking(false);
@@ -97,7 +99,7 @@
localEp_ = from;
remoteEp_ = to;
- if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+ if (!socketChannel_.connect(new InetSocketAddress(remoteEp_, DatabaseDescriptor.getStoragePort())))
{
key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
}
@@ -114,7 +116,7 @@
* This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
* Accept the connection and then register interest for reads.
*/
- static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ static void acceptConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
{
TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
tcpConnection.registerReadInterest();
@@ -126,7 +128,7 @@
}
// used for incoming connections
- TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ TcpConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
{
socketChannel_ = socketChannel;
socketChannel_.configureBlocking(false);
@@ -134,17 +136,17 @@
localEp_ = localEp;
}
- EndPoint getLocalEp()
+ InetAddress getLocalEp()
{
return localEp_;
}
- public void setLocalEp(EndPoint localEp)
+ public void setLocalEp(InetAddress localEp)
{
localEp_ = localEp;
}
- public EndPoint getEndPoint()
+ public InetAddress getEndPoint()
{
return remoteEp_;
}
@@ -453,7 +455,7 @@
/* first message received */
if (remoteEp_ == null)
{
- remoteEp_ = new EndPoint(socketChannel_.socket().getInetAddress().getHostAddress(), DatabaseDescriptor.getStoragePort());
+ remoteEp_ = socketChannel_.socket().getInetAddress();
// put connection into pool if possible
pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);
pool_.addToPool(TcpConnection.this);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java Wed Oct 21 18:26:02 2009
@@ -28,9 +28,9 @@
public class TcpConnectionHandler extends SelectionKeyHandler
{
private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
- EndPoint localEp_;
+ InetAddress localEp_;
- public TcpConnectionHandler(EndPoint localEp)
+ public TcpConnectionHandler(InetAddress localEp)
{
localEp_ = localEp;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Wed Oct 21 18:26:02 2009
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.locks.*;
+import java.net.InetAddress;
import org.apache.log4j.Logger;
@@ -28,15 +29,15 @@
{
private Lock lock_ = new ReentrantLock();
private List<TcpConnection> allConnections_;
- private EndPoint localEp_;
- private EndPoint remoteEp_;
+ private InetAddress localEp_;
+ private InetAddress remoteEp_;
private int maxSize_;
private int inUse_;
// TODO! this whole thing is a giant no-op, since "contains" only relies on TcpConnection.equals, which
// is true for any (local, remote) pairs. So there is only ever at most one TcpConnection per Manager!
- TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+ TcpConnectionManager(int initialSize, int growthFactor, int maxSize, InetAddress localEp, InetAddress remoteEp)
{
maxSize_ = maxSize;
localEp_ = localEp;
@@ -187,12 +188,12 @@
}
}
- EndPoint getLocalEndPoint()
+ InetAddress getLocalEndPoint()
{
return localEp_;
}
- EndPoint getRemoteEndPoint()
+ InetAddress getRemoteEndPoint()
{
return remoteEp_;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java Wed Oct 21 18:26:02 2009
@@ -19,23 +19,22 @@
package org.apache.cassandra.net;
import java.net.SocketAddress;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.nio.*;
import java.nio.channels.*;
-import java.util.*;
-import java.util.concurrent.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.utils.BasicUtilities;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
+
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
public class UdpConnection extends SelectionKeyHandler
{
@@ -44,8 +43,7 @@
private static final int protocol_ = 0xBADBEEF;
private DatagramChannel socketChannel_;
- private SelectionKey key_;
- private EndPoint localEndPoint_;
+ private SelectionKey key_;
public void init() throws IOException
{
@@ -54,17 +52,16 @@
socketChannel_.configureBlocking(false);
}
- public void init(int port) throws IOException
+ public void init(InetAddress localEp) throws IOException
{
- localEndPoint_ = new EndPoint(FBUtilities.getHostAddress(), port);
socketChannel_ = DatagramChannel.open();
- socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+ socketChannel_.socket().bind(new InetSocketAddress(localEp, DatabaseDescriptor.getControlPort()));
socketChannel_.socket().setReuseAddress(true);
socketChannel_.configureBlocking(false);
key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
}
- public boolean write(Message message, EndPoint to) throws IOException
+ public boolean write(Message message, InetAddress to) throws IOException
{
boolean bVal = true;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -81,7 +78,7 @@
buffer.put(data);
buffer.flip();
- int n = socketChannel_.send(buffer, to.getInetAddress());
+ int n = socketChannel_.send(buffer, new InetSocketAddress(to, DatabaseDescriptor.getControlPort()));
if ( n == 0 )
{
bVal = false;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java Wed Oct 21 18:26:02 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.net.io;
import java.net.InetSocketAddress;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
@@ -43,9 +44,8 @@
super(stream);
SocketChannel socketChannel = stream.getStream();
InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
- String remoteHost = remoteAddress.getAddress().getHostAddress();
- streamContext_ = StreamContextManager.getStreamContext(remoteHost);
- streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+ streamContext_ = StreamContextManager.getStreamContext(remoteAddress.getAddress());
+ streamStatus_ = StreamContextManager.getStreamStatus(remoteAddress.getAddress());
}
private void createFileChannel() throws IOException
@@ -63,7 +63,6 @@
{
SocketChannel socketChannel = stream_.getStream();
InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
- String remoteHostIp = remoteAddress.getAddress().getHostAddress();
createFileChannel();
if ( streamContext_ != null )
{
@@ -77,7 +76,7 @@
{
/* Ask the source node to re-stream this file. */
streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);
- handleStreamCompletion(remoteHostIp);
+ handleStreamCompletion(remoteAddress.getAddress());
/* Delete the orphaned file. */
File file = new File(streamContext_.getTargetFile());
file.delete();
@@ -87,7 +86,7 @@
{
if (logger_.isDebugEnabled())
logger_.debug("Removing stream context " + streamContext_);
- handleStreamCompletion(remoteHostIp);
+ handleStreamCompletion(remoteAddress.getAddress());
bytesRead_ = 0L;
fc_.close();
morphState();
@@ -97,7 +96,7 @@
return new byte[0];
}
- private void handleStreamCompletion(String remoteHost) throws IOException
+ private void handleStreamCompletion(InetAddress remoteHost) throws IOException
{
/*
* Streaming is complete. If all the data that has to be received inform the sender via
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java Wed Oct 21 18:26:02 2009
@@ -20,7 +20,7 @@
import java.io.IOException;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
public interface IStreamComplete
{
@@ -28,5 +28,5 @@
* This callback if registered with the StreamContextManager is
* called when the stream from a host is completely handled.
*/
- public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+ public void onStreamCompletion(InetAddress from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
}