You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/21 20:26:04 UTC

svn commit: r828130 [2/3] - in /incubator/cassandra/trunk: contrib/bmt_example/ contrib/property_snitch/src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java Wed Oct 21 18:26:02 2009
@@ -23,7 +23,7 @@
 import java.io.IOException;
 import java.util.*;
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.*;
 
 
@@ -40,19 +40,19 @@
         serializer_ = new GossipDigestAck2MessageSerializer();
     }
     
-    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+    Map<InetAddress, EndPointState> epStateMap_ = new HashMap<InetAddress, EndPointState>();
 
     public static ICompactSerializer<GossipDigestAck2Message> serializer()
     {
         return serializer_;
     }
     
-    GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
+    GossipDigestAck2Message(Map<InetAddress, EndPointState> epStateMap)
     {
         epStateMap_ = epStateMap;
     }
         
-    Map<EndPoint, EndPointState> getEndPointStateMap()
+    Map<InetAddress, EndPointState> getEndPointStateMap()
     {
          return epStateMap_;
     }
@@ -68,7 +68,7 @@
 
     public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
     {
-        Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+        Map<InetAddress, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
         return new GossipDigestAck2Message(epStateMap);        
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Wed Oct 21 18:26:02 2009
@@ -24,7 +24,7 @@
 import java.util.*;
 
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 
 
@@ -42,20 +42,20 @@
     }
     
     List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
-    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+    Map<InetAddress, EndPointState> epStateMap_ = new HashMap<InetAddress, EndPointState>();
     
     static ICompactSerializer<GossipDigestAckMessage> serializer()
     {
         return serializer_;
     }
     
-    GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
+    GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<InetAddress, EndPointState> epStateMap)
     {
         gDigestList_ = gDigestList;
         epStateMap_ = epStateMap;
     }
     
-    void addGossipDigest(EndPoint ep, int generation, int version)
+    void addGossipDigest(InetAddress ep, int generation, int version)
     {
         gDigestList_.add( new GossipDigest(ep, generation, version) );
     }
@@ -65,7 +65,7 @@
         return gDigestList_;
     }
     
-    Map<EndPoint, EndPointState> getEndPointStateMap()
+    Map<InetAddress, EndPointState> getEndPointStateMap()
     {
         return epStateMap_;
     }
@@ -87,7 +87,7 @@
 
     public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
     {
-        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+        Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
         List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);                
         boolean bContinue = dis.readBoolean();
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.utils.Log4jLogger;
 import org.apache.log4j.Logger;
 import org.apache.cassandra.utils.*;
@@ -115,15 +115,15 @@
 {
     private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
     
-    static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+    static boolean serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException
     {
         boolean bVal = true;
         int estimate = 0;                
         int size = epStateMap.size();
         dos.writeInt(size);
     
-        Set<EndPoint> eps = epStateMap.keySet();
-        for( EndPoint ep : eps )
+        Set<InetAddress> eps = epStateMap.keySet();
+        for( InetAddress ep : eps )
         {
             if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
             {
@@ -142,10 +142,10 @@
         return bVal;
     }
 
-    static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
+    static Map<InetAddress, EndPointState> deserialize(DataInputStream dis) throws IOException
     {
         int size = dis.readInt();            
-        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+        Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
         
         for ( int i = 0; i < size; ++i )
         {
@@ -155,7 +155,7 @@
                 break;
             }
             // int length = dis.readInt();            
-            EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
+            InetAddress ep = CompactEndPointSerializationHelper.deserialize(dis);
             EndPointState epState = EndPointState.serializer().deserialize(dis);            
             epStateMap.put(ep, epState);
         }        

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
 import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -109,7 +109,7 @@
     }
 
     private Timer gossipTimer_ = new Timer(false);
-    private EndPoint localEndPoint_;
+    private InetAddress localEndPoint_;
     private long aVeryLongTime_;
     private Random random_ = new Random();
     /* round robin index through live endpoint set */
@@ -119,16 +119,16 @@
     private List<IEndPointStateChangeSubscriber> subscribers_ = new ArrayList<IEndPointStateChangeSubscriber>();
 
     /* live member set */
-    private Set<EndPoint> liveEndpoints_ = new HashSet<EndPoint>();
+    private Set<InetAddress> liveEndpoints_ = new HashSet<InetAddress>();
 
     /* unreachable member set */
-    private Set<EndPoint> unreachableEndpoints_ = new HashSet<EndPoint>();
+    private Set<InetAddress> unreachableEndpoints_ = new HashSet<InetAddress>();
 
     /* initial seeds for joining the cluster */
-    private Set<EndPoint> seeds_ = new HashSet<EndPoint>();
+    private Set<InetAddress> seeds_ = new HashSet<InetAddress>();
 
     /* map where key is the endpoint and value is the state associated with the endpoint */
-    Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
+    Map<InetAddress, EndPointState> endPointStateMap_ = new Hashtable<InetAddress, EndPointState>();
 
     private Gossiper()
     {
@@ -155,24 +155,24 @@
         subscribers_.remove(subscriber);
     }
 
-    public Set<EndPoint> getAllMembers()
+    public Set<InetAddress> getAllMembers()
     {
-        Set<EndPoint> allMbrs = new HashSet<EndPoint>();
+        Set<InetAddress> allMbrs = new HashSet<InetAddress>();
         allMbrs.addAll(getLiveMembers());
         allMbrs.addAll(getUnreachableMembers());
         return allMbrs;
     }
 
-    public Set<EndPoint> getLiveMembers()
+    public Set<InetAddress> getLiveMembers()
     {
-        Set<EndPoint> liveMbrs = new HashSet<EndPoint>(liveEndpoints_);
-        liveMbrs.add( new EndPoint( localEndPoint_.getHost(), localEndPoint_.getPort() ) );
+        Set<InetAddress> liveMbrs = new HashSet<InetAddress>(liveEndpoints_);
+        liveMbrs.add(localEndPoint_);
         return liveMbrs;
     }
 
-    public Set<EndPoint> getUnreachableMembers()
+    public Set<InetAddress> getUnreachableMembers()
     {
-        return new HashSet<EndPoint>(unreachableEndpoints_);
+        return new HashSet<InetAddress>(unreachableEndpoints_);
     }
 
     /**
@@ -181,7 +181,7 @@
      *
      * param@ ep the endpoint to be removed from membership.
      */
-    public synchronized void removeFromMembership(EndPoint ep)
+    public synchronized void removeFromMembership(InetAddress ep)
     {
         endPointStateMap_.remove(ep);
         liveEndpoints_.remove(ep);
@@ -195,7 +195,7 @@
      * param @ endpoint end point that is convicted.
     */
 
-    public void convict(EndPoint endpoint)
+    public void convict(InetAddress endpoint)
     {
         EndPointState epState = endPointStateMap_.get(endpoint);
         if ( epState != null )
@@ -208,7 +208,7 @@
                 */
                 if ( liveEndpoints_.contains(endpoint) )
                 {
-                    logger_.info("EndPoint " + endpoint + " is now dead.");
+                    logger_.info("InetAddress " + endpoint + " is now dead.");
                     isAlive(endpoint, epState, false);
 
                     /* Notify an endpoint is dead to interested parties. */
@@ -226,12 +226,12 @@
      *
      * param @ endpoint end point that is suspected.
     */
-    public void suspect(EndPoint endpoint)
+    public void suspect(InetAddress endpoint)
     {
         EndPointState epState = endPointStateMap_.get(endpoint);
         if ( epState.isAlive() )
         {
-            logger_.info("EndPoint " + endpoint + " is now dead.");
+            logger_.info("InetAddress " + endpoint + " is now dead.");
             isAlive(endpoint, epState, false);
 
             /* Notify an endpoint is dead to interested parties. */
@@ -265,7 +265,7 @@
      *
      * @param endpoint endpoint to be removed from the current membership.
     */
-    void evictFromMembership(EndPoint endpoint)
+    void evictFromMembership(InetAddress endpoint)
     {
         unreachableEndpoints_.remove(endpoint);
     }
@@ -280,7 +280,7 @@
         int maxVersion = getMaxEndPointStateVersion(epState);
         gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
 
-        for ( EndPoint liveEndPoint : liveEndpoints_ )
+        for ( InetAddress liveEndPoint : liveEndpoints_ )
         {
             epState = endPointStateMap_.get(liveEndPoint);
             if ( epState != null )
@@ -311,9 +311,9 @@
         int maxVersion = getMaxEndPointStateVersion(epState);
         gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
 
-        List<EndPoint> endpoints = new ArrayList<EndPoint>( liveEndpoints_ );
+        List<InetAddress> endpoints = new ArrayList<InetAddress>( liveEndpoints_ );
         Collections.shuffle(endpoints, random_);
-        for ( EndPoint liveEndPoint : endpoints )
+        for ( InetAddress liveEndPoint : endpoints )
         {
             epState = endPointStateMap_.get(liveEndPoint);
             if ( epState != null )
@@ -339,7 +339,7 @@
             logger_.trace("Gossip Digests are : " + sb.toString());
     }
 
-    public int getCurrentGenerationNumber(EndPoint endpoint)
+    public int getCurrentGenerationNumber(InetAddress endpoint)
     {
     	return endPointStateMap_.get(endpoint).getHeartBeatState().getGeneration();
     }
@@ -377,14 +377,14 @@
     boolean sendGossipToLiveNode(Message message)
     {
         int size = liveEndpoints_.size();
-        List<EndPoint> eps = new ArrayList<EndPoint>(liveEndpoints_);
+        List<InetAddress> eps = new ArrayList<InetAddress>(liveEndpoints_);
 
         if ( rrIndex_ >= size )
         {
             rrIndex_ = -1;
         }
 
-        EndPoint to = eps.get(++rrIndex_);
+        InetAddress to = eps.get(++rrIndex_);
         if (logger_.isTraceEnabled())
             logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
         MessagingService.instance().sendUdpOneWay(message, to);
@@ -398,13 +398,13 @@
      *  @param epSet a set of endpoint from which a random endpoint is chosen.
      *  @return true if the chosen endpoint is also a seed.
      */
-    boolean sendGossip(Message message, Set<EndPoint> epSet)
+    boolean sendGossip(Message message, Set<InetAddress> epSet)
     {
         int size = epSet.size();
         /* Generate a random number from 0 -> size */
-        List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
+        List<InetAddress> liveEndPoints = new ArrayList<InetAddress>(epSet);
         int index = (size == 1) ? 0 : random_.nextInt(size);
-        EndPoint to = liveEndPoints.get(index);
+        InetAddress to = liveEndPoints.get(index);
         if (logger_.isTraceEnabled())
             logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
         MessagingService.instance().sendUdpOneWay(message, to);
@@ -465,9 +465,9 @@
 
     void doStatusCheck()
     {
-        Set<EndPoint> eps = endPointStateMap_.keySet();
+        Set<InetAddress> eps = endPointStateMap_.keySet();
 
-        for ( EndPoint endpoint : eps )
+        for ( InetAddress endpoint : eps )
         {
             if ( endpoint.equals(localEndPoint_) )
                 continue;
@@ -485,12 +485,12 @@
         }
     }
 
-    EndPointState getEndPointStateForEndPoint(EndPoint ep)
+    EndPointState getEndPointStateForEndPoint(InetAddress ep)
     {
         return endPointStateMap_.get(ep);
     }
 
-    synchronized EndPointState getStateForVersionBiggerThan(EndPoint forEndpoint, int version)
+    synchronized EndPointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
     {
         EndPointState epState = endPointStateMap_.get(forEndpoint);
         EndPointState reqdEndPointState = null;
@@ -536,7 +536,7 @@
      * when a new node coming up multicasts the JoinMessage. Here we need
      * to add the endPoint to the list of live endpoints.
     */
-    synchronized void join(EndPoint from)
+    synchronized void join(InetAddress from)
     {
         if ( !from.equals( localEndPoint_ ) )
         {
@@ -580,11 +580,11 @@
         }
     }
 
-    void notifyFailureDetector(Map<EndPoint, EndPointState> remoteEpStateMap)
+    void notifyFailureDetector(Map<InetAddress, EndPointState> remoteEpStateMap)
     {
         IFailureDetector fd = FailureDetector.instance();
-        Set<EndPoint> endpoints = remoteEpStateMap.keySet();
-        for ( EndPoint endpoint : endpoints )
+        Set<InetAddress> endpoints = remoteEpStateMap.keySet();
+        for ( InetAddress endpoint : endpoints )
         {
             EndPointState remoteEndPointState = remoteEpStateMap.get(endpoint);
             EndPointState localEndPointState = endPointStateMap_.get(endpoint);
@@ -616,18 +616,18 @@
         }
     }
 
-    void markAlive(EndPoint addr, EndPointState localState)
+    void markAlive(InetAddress addr, EndPointState localState)
     {
         if (logger_.isTraceEnabled())
             logger_.trace("marking as alive " + addr);
         if ( !localState.isAlive() )
         {
             isAlive(addr, localState, true);
-            logger_.info("EndPoint " + addr + " is now UP");
+            logger_.info("InetAddress " + addr + " is now UP");
         }
     }
 
-    private void handleNewJoin(EndPoint ep, EndPointState epState)
+    private void handleNewJoin(InetAddress ep, EndPointState epState)
     {
     	logger_.info("Node " + ep + " has now joined.");
         /* Mark this endpoint as "live" */
@@ -637,10 +637,10 @@
         doNotifications(ep, epState);
     }
 
-    synchronized void applyStateLocally(Map<EndPoint, EndPointState> epStateMap)
+    synchronized void applyStateLocally(Map<InetAddress, EndPointState> epStateMap)
     {
-        Set<EndPoint> eps = epStateMap.keySet();
-        for( EndPoint ep : eps )
+        Set<InetAddress> eps = epStateMap.keySet();
+        for( InetAddress ep : eps )
         {
             if ( ep.equals( localEndPoint_ ) )
                 continue;
@@ -681,7 +681,7 @@
         }
     }
 
-    void applyHeartBeatStateLocally(EndPoint addr, EndPointState localState, EndPointState remoteState)
+    void applyHeartBeatStateLocally(InetAddress addr, EndPointState localState, EndPointState remoteState)
     {
         HeartBeatState localHbState = localState.getHeartBeatState();
         HeartBeatState remoteHbState = remoteState.getHeartBeatState();
@@ -703,7 +703,7 @@
         }
     }
 
-    void applyApplicationStateLocally(EndPoint addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
+    void applyApplicationStateLocally(InetAddress addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
     {
         Map<String, ApplicationState> localAppStateMap = localStatePtr.getApplicationState();
         Map<String, ApplicationState> remoteAppStateMap = remoteStatePtr.getApplicationState();
@@ -757,7 +757,7 @@
         }
     }
 
-    void doNotifications(EndPoint addr, EndPointState epState)
+    void doNotifications(InetAddress addr, EndPointState epState)
     {
         for ( IEndPointStateChangeSubscriber subscriber : subscribers_ )
         {
@@ -765,7 +765,7 @@
         }
     }
 
-    synchronized void isAlive(EndPoint addr, EndPointState epState, boolean value)
+    synchronized void isAlive(InetAddress addr, EndPointState epState, boolean value)
     {
         epState.isAlive(value);
         if ( value )
@@ -784,9 +784,9 @@
     }
 
     /* These are helper methods used from GossipDigestSynVerbHandler */
-    Map<EndPoint, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
+    Map<InetAddress, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
     {
-        Map<EndPoint, GossipDigest> epMap = new HashMap<EndPoint, GossipDigest>();
+        Map<InetAddress, GossipDigest> epMap = new HashMap<InetAddress, GossipDigest>();
         for( GossipDigest gDigest : gDigestList )
         {
             epMap.put( gDigest.getEndPoint(), gDigest );
@@ -795,14 +795,14 @@
     }
 
     /* This is a helper method to get all EndPoints from a list of GossipDigests */
-    EndPoint[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
+    InetAddress[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
     {
-        Set<EndPoint> set = new HashSet<EndPoint>();
+        Set<InetAddress> set = new HashSet<InetAddress>();
         for ( GossipDigest gDigest : gDigestList )
         {
             set.add( gDigest.getEndPoint() );
         }
-        return set.toArray( new EndPoint[0] );
+        return set.toArray( new InetAddress[0] );
     }
 
     /* Request all the state for the endpoint in the gDigest */
@@ -813,7 +813,7 @@
     }
 
     /* Send all the data with version greater than maxRemoteVersion */
-    void sendAll(GossipDigest gDigest, Map<EndPoint, EndPointState> deltaEpStateMap, int maxRemoteVersion)
+    void sendAll(GossipDigest gDigest, Map<InetAddress, EndPointState> deltaEpStateMap, int maxRemoteVersion)
     {
         EndPointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndPoint(), maxRemoteVersion) ;
         if ( localEpStatePtr != null )
@@ -824,7 +824,7 @@
         This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
         and the delta state are built up.
     */
-    synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<EndPoint, EndPointState> deltaEpStateMap)
+    synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndPointState> deltaEpStateMap)
     {
         for ( GossipDigest gDigest : gDigestList )
         {
@@ -887,16 +887,14 @@
      * Start the gossiper with the generation # retrieved from the System
      * table
      */
-    public void start(EndPoint localEndPoint, int generationNbr) throws IOException
+    public void start(InetAddress localEndPoint, int generationNbr) throws IOException
     {
         localEndPoint_ = localEndPoint;
         /* Get the seeds from the config and initialize them. */
-        Set<String> seedHosts = DatabaseDescriptor.getSeeds();
-        for( String seedHost : seedHosts )
+        Set<InetAddress> seedHosts = DatabaseDescriptor.getSeeds();
+        for (InetAddress seed : seedHosts)
         {
-            EndPoint seed = new EndPoint(InetAddress.getByName(seedHost).getHostAddress(),
-                                         DatabaseDescriptor.getControlPort());
-            if ( seed.equals(localEndPoint) )
+            if (seed.equals(localEndPoint))
                 continue;
             seeds_.add(seed);
         }
@@ -943,7 +941,7 @@
 
     public void doVerb(Message message)
     {
-        EndPoint from = message.getFrom();
+        InetAddress from = message.getFrom();
         if (logger_.isDebugEnabled())
           logger_.debug("Received a JoinMessage from " + from);
 
@@ -972,7 +970,7 @@
 
     public void doVerb(Message message)
     {
-        EndPoint from = message.getFrom();
+        InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())
             logger_.trace("Received a GossipDigestSynMessage from " + from);
 
@@ -993,7 +991,7 @@
             doSort(gDigestList);
 
             List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
-            Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
             Gossiper.instance().examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
 
             GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
@@ -1011,14 +1009,14 @@
     /*
      * First construct a map whose key is the endpoint in the GossipDigest and the value is the
      * GossipDigest itself. Then build a list of version differences i.e difference between the
-     * version in the GossipDigest and the version in the local state for a given EndPoint.
+     * version in the GossipDigest and the version in the local state for a given InetAddress.
      * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
      * to the endpoint from the map that was initially constructed.
     */
     private void doSort(List<GossipDigest> gDigestList)
     {
         /* Construct a map of endpoint to GossipDigest. */
-        Map<EndPoint, GossipDigest> epToDigestMap = new HashMap<EndPoint, GossipDigest>();
+        Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
         for ( GossipDigest gDigest : gDigestList )
         {
             epToDigestMap.put(gDigest.getEndPoint(), gDigest);
@@ -1031,7 +1029,7 @@
         List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
         for ( GossipDigest gDigest : gDigestList )
         {
-            EndPoint ep = gDigest.getEndPoint();
+            InetAddress ep = gDigest.getEndPoint();
             EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep);
             int version = (epState != null) ? Gossiper.instance().getMaxEndPointStateVersion( epState ) : 0;
             int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
@@ -1058,7 +1056,7 @@
 
     public void doVerb(Message message)
     {
-        EndPoint from = message.getFrom();
+        InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())
             logger_.trace("Received a GossipDigestAckMessage from " + from);
 
@@ -1069,7 +1067,7 @@
         {
             GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
             List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
-            Map<EndPoint, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+            Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
 
             if ( epStateMap.size() > 0 )
             {
@@ -1079,10 +1077,10 @@
             }
 
             /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
-            Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
             for( GossipDigest gDigest : gDigestList )
             {
-                EndPoint addr = gDigest.getEndPoint();
+                InetAddress addr = gDigest.getEndPoint();
                 EndPointState localEpStatePtr = Gossiper.instance().getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
                 if ( localEpStatePtr != null )
                     deltaEpStateMap.put(addr, localEpStatePtr);
@@ -1107,7 +1105,7 @@
 
     public void doVerb(Message message)
     {
-        EndPoint from = message.getFrom();
+        InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())
             logger_.trace("Received a GossipDigestAck2Message from " + from);
 
@@ -1122,7 +1120,7 @@
         {
             throw new RuntimeException(e);
         }
-        Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+        Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
         /* Notify the Failure Detector */
         Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
         Gossiper.instance().applyStateLocally(remoteEpStateMap);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
 
 package org.apache.cassandra.gms;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 /**
  * This is called by an instance of the IEndPointStateChangePublisher to notify
@@ -38,5 +38,5 @@
      * @param endpoint endpoint for which the state change occurred.
      * @param epState state that actually changed for the above endpoint.
      */
-    public void onChange(EndPoint endpoint, EndPointState epState);
+    public void onChange(InetAddress endpoint, EndPointState epState);
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
 
 package org.apache.cassandra.gms;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 /**
  * Implemented by the Gossiper to either convict/suspect an endpoint
@@ -32,11 +32,11 @@
      * Convict the specified endpoint.
      * @param ep endpoint to be convicted
      */
-    public void convict(EndPoint ep);
+    public void convict(InetAddress ep);
     
     /**
      * Suspect the specified endpoint.
      * @param ep endpoint to be suspected.
      */
-    public void suspect(EndPoint ep);    
+    public void suspect(InetAddress ep);
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
 
 package org.apache.cassandra.gms;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 /**
  * An interface that provides an application with the ability
@@ -36,7 +36,7 @@
      * @param ep endpoint in question.
      * @return true if UP and false if DOWN.
      */
-    public boolean isAlive(EndPoint ep);
+    public boolean isAlive(InetAddress ep);
     
     /**
      * This method is invoked by any entity wanting to interrogate the status of an endpoint. 
@@ -45,7 +45,7 @@
      * 
      * param ep endpoint for which we interpret the inter arrival times.
     */
-    public void interpret(EndPoint ep);
+    public void interpret(InetAddress ep);
     
     /**
      * This method is invoked by the receiver of the heartbeat. In our case it would be
@@ -54,7 +54,7 @@
      * 
      * param ep endpoint being reported.
     */
-    public void report(EndPoint ep);
+    public void report(InetAddress ep);
     
     /**
      * Register interest for Failure Detector events. 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java Wed Oct 21 18:26:02 2009
@@ -18,10 +18,10 @@
 
 package org.apache.cassandra.gms;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 public interface IFailureNotification
 {   
-    public void suspect(EndPoint ep);
-    public void revive(EndPoint ep);
+    public void suspect(InetAddress ep);
+    public void revive(InetAddress ep);
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Oct 21 18:26:02 2009
@@ -26,8 +26,9 @@
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
+import java.net.InetAddress;
+
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * This class contains a helper method that will be used by
@@ -51,9 +52,9 @@
         storagePort_ = storagePort;
     }
 
-    public abstract EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
+    public abstract InetAddress[] getReadStorageEndPoints(Token token, Map<Token, InetAddress> tokenToEndPointMap);
 
-    public EndPoint[] getReadStorageEndPoints(Token token)
+    public InetAddress[] getReadStorageEndPoints(Token token)
     {
         return getReadStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
     }
@@ -63,7 +64,7 @@
      * on which the data is being placed and the value is the
      * endpoint to which it should be forwarded.
      */
-    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token, EndPoint[] naturalEndpoints)
+    public Map<InetAddress, InetAddress> getHintedStorageEndPoints(Token token, InetAddress[] naturalEndpoints)
     {
         return getHintedMapForEndpoints(getWriteStorageEndPoints(token, naturalEndpoints));
     }
@@ -76,14 +77,14 @@
      *
      * Only ReplicationStrategy should care about this method (higher level users should only ask for Hinted).
      */
-    protected EndPoint[] getWriteStorageEndPoints(Token token, EndPoint[] naturalEndpoints)
+    public InetAddress[] getWriteStorageEndPoints(Token token, InetAddress[] naturalEndpoints)
     {
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        Map<Token, EndPoint> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
-        ArrayList<EndPoint> list = new ArrayList<EndPoint>(Arrays.asList(naturalEndpoints));
+        Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, InetAddress> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
+        ArrayList<InetAddress> list = new ArrayList<InetAddress>(Arrays.asList(getReadStorageEndPoints(token, tokenToEndPointMap)));
         for (Token t : bootstrapTokensToEndpointMap.keySet())
         {
-            EndPoint ep = bootstrapTokensToEndpointMap.get(t);
+            InetAddress ep = bootstrapTokensToEndpointMap.get(t);
             tokenToEndPointMap.put(t, ep);
             try
             {
@@ -101,29 +102,15 @@
                 tokenToEndPointMap.remove(t);
             }
         }
-        return retrofitPorts(list).toArray(new EndPoint[list.size()]);
-    }
-
-    /*
-     * This method changes the ports of the endpoints from
-     * the control port to the storage ports.
-    */
-    public List<EndPoint> retrofitPorts(List<EndPoint> eps)
-    {
-        List<EndPoint> retrofitted = new ArrayList<EndPoint>();
-        for ( EndPoint ep : eps )
-        {
-            retrofitted.add(new EndPoint(ep.getHost(), ep.getPort()));
-        }
-        return retrofitted;
+        return list.toArray(new InetAddress[list.size()]);
     }
 
-    private Map<EndPoint, EndPoint> getHintedMapForEndpoints(EndPoint[] topN)
+    private Map<InetAddress, InetAddress> getHintedMapForEndpoints(InetAddress[] topN)
     {
-        Set<EndPoint> usedEndpoints = new HashSet<EndPoint>();
-        Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
+        Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
+        Map<InetAddress, InetAddress> map = new HashMap<InetAddress, InetAddress>();
 
-        for (EndPoint ep : topN)
+        for (InetAddress ep : topN)
         {
             if (FailureDetector.instance().isAlive(ep))
             {
@@ -133,8 +120,8 @@
             else
             {
                 // find another endpoint to store a hint on.  prefer endpoints that aren't already in use
-                EndPoint hintLocation = null;
-                Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+                InetAddress hintLocation = null;
+                Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
                 List tokens = new ArrayList(tokenToEndPointMap.keySet());
                 Collections.sort(tokens);
                 Token token = tokenMetadata_.getToken(ep);
@@ -149,7 +136,7 @@
                 int startIndex = (index + 1) % totalNodes;
                 for (int i = startIndex, count = 1; count < totalNodes; ++count, i = (i + 1) % totalNodes)
                 {
-                    EndPoint tmpEndPoint = tokenToEndPointMap.get(tokens.get(i));
+                    InetAddress tmpEndPoint = tokenToEndPointMap.get(tokens.get(i));
                     if (FailureDetector.instance().isAlive(tmpEndPoint) && !Arrays.asList(topN).contains(tmpEndPoint) && !usedEndpoints.contains(tmpEndPoint))
                     {
                         hintLocation = tmpEndPoint;
@@ -158,7 +145,7 @@
                 }
                 // if all endpoints are already in use, might as well store it locally to save the network trip
                 if (hintLocation == null)
-                    hintLocation = StorageService.getLocalControlEndPoint();
+                    hintLocation = FBUtilities.getLocalAddress();
 
                 map.put(hintLocation, ep);
                 usedEndpoints.add(hintLocation);
@@ -169,11 +156,11 @@
 
     // TODO this is pretty inefficient.
     // fixing this probably requires merging tokenmetadata into replicationstrategy, so we can cache/invalidate cleanly
-    protected Map<EndPoint, Set<Range>> getRangeMap(Map<Token, EndPoint> tokenMap)
+    protected Map<InetAddress, Set<Range>> getRangeMap(Map<Token, InetAddress> tokenMap)
     {
-        Map<EndPoint, Set<Range>> map = new HashMap<EndPoint, Set<Range>>();
+        Map<InetAddress, Set<Range>> map = new HashMap<InetAddress, Set<Range>>();
 
-        for (EndPoint ep : tokenMap.values())
+        for (InetAddress ep : tokenMap.values())
         {
             map.put(ep, new HashSet<Range>());
         }
@@ -181,7 +168,7 @@
         for (Token token : tokenMap.keySet())
         {
             Range range = getPrimaryRangeFor(token, tokenMap);
-            for (EndPoint ep : getReadStorageEndPoints(token, tokenMap))
+            for (InetAddress ep : getReadStorageEndPoints(token, tokenMap))
             {
                 map.get(ep).add(range);
             }
@@ -190,17 +177,17 @@
         return map;
     }
 
-    public Map<EndPoint, Set<Range>> getRangeMap()
+    public Map<InetAddress, Set<Range>> getRangeMap()
     {
         return getRangeMap(tokenMetadata_.cloneTokenEndPointMap());
     }
 
-    public Range getPrimaryRangeFor(Token right, Map<Token, EndPoint> tokenToEndPointMap)
+    public Range getPrimaryRangeFor(Token right, Map<Token, InetAddress> tokenToEndPointMap)
     {
         return new Range(getPredecessor(right, tokenToEndPointMap), right);
     }
 
-    public Token getPredecessor(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    public Token getPredecessor(Token token, Map<Token, InetAddress> tokenToEndPointMap)
     {
         List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
@@ -208,7 +195,7 @@
         return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(--index));
     }
 
-    public Token getSuccessor(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    public Token getSuccessor(Token token, Map<Token, InetAddress> tokenToEndPointMap)
     {
         List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java Wed Oct 21 18:26:02 2009
@@ -20,11 +20,11 @@
 
 import java.net.*;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 public class EndPointSnitch implements IEndPointSnitch
 {
-    public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException
+    public boolean isOnSameRack(InetAddress host, InetAddress host2) throws UnknownHostException
     {
         /*
          * Look at the IP Address of the two hosts. Compare 
@@ -37,7 +37,7 @@
         return ( ip[2] == ip2[2] );
     }
     
-    public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException
+    public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException
     {
         /*
          * Look at the IP Address of the two hosts. Compare 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java Wed Oct 21 18:26:02 2009
@@ -20,7 +20,7 @@
 
 import java.net.UnknownHostException;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 
 /**
@@ -38,7 +38,7 @@
      * @return true if on the same rack false otherwise
      * @throws UnknownHostException
      */
-    public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException;
+    public boolean isOnSameRack(InetAddress host, InetAddress host2) throws UnknownHostException;
     
     /**
      * Helps determine if 2 nodes are in the same data center.
@@ -47,5 +47,5 @@
      * @return true if in the same data center false otherwise
      * @throws UnknownHostException
      */
-    public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException;
+    public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException;
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,7 @@
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
 
@@ -44,10 +44,10 @@
         super(tokenMetadata, partitioner, replicas, storagePort);
     }
 
-    public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    public InetAddress[] getReadStorageEndPoints(Token token, Map<Token, InetAddress> tokenToEndPointMap)
     {
         int startIndex;
-        List<EndPoint> list = new ArrayList<EndPoint>();
+        List<InetAddress> list = new ArrayList<InetAddress>();
         boolean bDataCenter = false;
         boolean bOtherRack = false;
         int foundCount = 0;
@@ -66,7 +66,7 @@
         foundCount++;
         if( replicas_ == 1 )
         {
-            return list.toArray(new EndPoint[list.size()]);
+            return list.toArray(new InetAddress[list.size()]);
         }
         startIndex = (index + 1)%totalNodes;
         IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
@@ -117,6 +117,6 @@
                 foundCount++;
             }
         }
-        return retrofitPorts(list).toArray(new EndPoint[list.size()]);
+        return list.toArray(new InetAddress[list.size()]);
     }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 /**
  * This class returns the nodes responsible for a given
@@ -40,7 +40,7 @@
         super(tokenMetadata, partitioner, replicas, storagePort);
     }
 
-    public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    public InetAddress[] getReadStorageEndPoints(Token token, Map<Token, InetAddress> tokenToEndPointMap)
     {
         int startIndex;
         List<Token> tokenList = new ArrayList<Token>();
@@ -74,9 +74,9 @@
                     foundCount++;
             }
         }
-        List<EndPoint> list = new ArrayList<EndPoint>();
+        List<InetAddress> list = new ArrayList<InetAddress>();
         for (Token t: tokenList)
             list.add(tokenToEndPointMap.get(t));
-        return retrofitPorts(list).toArray(new EndPoint[list.size()]);
+        return list.toArray(new InetAddress[list.size()]);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Oct 21 18:26:02 2009
@@ -23,7 +23,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.UnavailableException;
@@ -31,23 +31,23 @@
 public class TokenMetadata
 {
     /* Maintains token to endpoint map of every node in the cluster. */
-    private Map<Token, EndPoint> tokenToEndPointMap_;
+    private Map<Token, InetAddress> tokenToEndPointMap_;
     /* Maintains a reverse index of endpoint to token in the cluster. */
-    private Map<EndPoint, Token> endPointToTokenMap_;
+    private Map<InetAddress, Token> endPointToTokenMap_;
     /* Bootstrapping nodes and their tokens */
-    private Map<Token, EndPoint> bootstrapNodes;
+    private Map<Token, InetAddress> bootstrapNodes;
     
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
 
     public TokenMetadata()
     {
-        tokenToEndPointMap_ = new HashMap<Token, EndPoint>();
-        endPointToTokenMap_ = new HashMap<EndPoint, Token>();
-        this.bootstrapNodes = Collections.synchronizedMap(new HashMap<Token, EndPoint>());
+        tokenToEndPointMap_ = new HashMap<Token, InetAddress>();
+        endPointToTokenMap_ = new HashMap<InetAddress, Token>();
+        this.bootstrapNodes = Collections.synchronizedMap(new HashMap<Token, InetAddress>());
     }
 
-    public TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap, Map<Token, EndPoint> bootstrapNodes)
+    public TokenMetadata(Map<Token, InetAddress> tokenToEndPointMap, Map<InetAddress, Token> endPointToTokenMap, Map<Token, InetAddress> bootstrapNodes)
     {
         tokenToEndPointMap_ = tokenToEndPointMap;
         endPointToTokenMap_ = endPointToTokenMap;
@@ -59,14 +59,14 @@
         return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap(), cloneBootstrapNodes());
     }
         
-    public void update(Token token, EndPoint endpoint)
+    public void update(Token token, InetAddress endpoint)
     {
         this.update(token, endpoint, false);
     }
     /**
      * Update the two maps in an safe mode. 
     */
-    public void update(Token token, EndPoint endpoint, boolean bootstrapState)
+    public void update(Token token, InetAddress endpoint, boolean bootstrapState)
     {
         lock_.writeLock().lock();
         try
@@ -96,7 +96,7 @@
      * Remove the entries in the two maps.
      * @param endpoint
      */
-    public void remove(EndPoint endpoint)
+    public void remove(InetAddress endpoint)
     {
         lock_.writeLock().lock();
         try
@@ -112,7 +112,7 @@
         }
     }
     
-    public Token getToken(EndPoint endpoint)
+    public Token getToken(InetAddress endpoint)
     {
         lock_.readLock().lock();
         try
@@ -125,7 +125,7 @@
         }
     }
     
-    public boolean isKnownEndPoint(EndPoint ep)
+    public boolean isKnownEndPoint(InetAddress ep)
     {
         lock_.readLock().lock();
         try
@@ -138,7 +138,7 @@
         }
     }
 
-    public EndPoint getFirstEndpoint()
+    public InetAddress getFirstEndpoint()
     {
         lock_.readLock().lock();
         try
@@ -156,7 +156,7 @@
     }
     
 
-    public EndPoint getNextEndpoint(EndPoint endPoint) throws UnavailableException
+    public InetAddress getNextEndpoint(InetAddress endPoint) throws UnavailableException
     {
         lock_.readLock().lock();
         try
@@ -167,7 +167,7 @@
             Collections.sort(tokens);
             int i = tokens.indexOf(endPointToTokenMap_.get(endPoint)); // TODO binary search
             int j = 1;
-            EndPoint ep;
+            InetAddress ep;
             while (!FailureDetector.instance().isAlive((ep = tokenToEndPointMap_.get(tokens.get((i + j) % tokens.size())))))
             {
                 if (++j > DatabaseDescriptor.getReplicationFactor())
@@ -183,12 +183,12 @@
         }
     }
     
-    public Map<Token, EndPoint> cloneBootstrapNodes()
+    public Map<Token, InetAddress> cloneBootstrapNodes()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<Token, EndPoint>( bootstrapNodes );
+            return new HashMap<Token, InetAddress>( bootstrapNodes );
         }
         finally
         {
@@ -200,12 +200,12 @@
     /*
      * Returns a safe clone of tokenToEndPointMap_.
     */
-    public Map<Token, EndPoint> cloneTokenEndPointMap()
+    public Map<Token, InetAddress> cloneTokenEndPointMap()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<Token, EndPoint>( tokenToEndPointMap_ );
+            return new HashMap<Token, InetAddress>( tokenToEndPointMap_ );
         }
         finally
         {
@@ -216,12 +216,12 @@
     /*
      * Returns a safe clone of endPointTokenMap_.
     */
-    public Map<EndPoint, Token> cloneEndPointTokenMap()
+    public Map<InetAddress, Token> cloneEndPointTokenMap()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<EndPoint, Token>( endPointToTokenMap_ );
+            return new HashMap<InetAddress, Token>( endPointToTokenMap_ );
         }
         finally
         {
@@ -232,9 +232,9 @@
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-        Set<EndPoint> eps = endPointToTokenMap_.keySet();
+        Set<InetAddress> eps = endPointToTokenMap_.keySet();
         
-        for ( EndPoint ep : eps )
+        for ( InetAddress ep : eps )
         {
             sb.append(ep);
             sb.append(":");
@@ -245,7 +245,7 @@
         return sb.toString();
     }
 
-    public EndPoint getEndPoint(Token token)
+    public InetAddress getEndPoint(Token token)
     {
         lock_.readLock().lock();
         try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java Wed Oct 21 18:26:02 2009
@@ -19,18 +19,19 @@
 package org.apache.cassandra.net;
 
 import java.io.*;
+import java.net.InetAddress;
 
 public class CompactEndPointSerializationHelper
 {
-    public static void serialize(EndPoint endPoint, DataOutputStream dos) throws IOException
+    public static void serialize(InetAddress endPoint, DataOutputStream dos) throws IOException
     {        
         dos.write(endPoint.getAddress());
     }
     
-    public static EndPoint deserialize(DataInputStream dis) throws IOException
+    public static InetAddress deserialize(DataInputStream dis) throws IOException
     {     
-        byte[] bytes = new byte[6];
+        byte[] bytes = new byte[4];
         dis.readFully(bytes, 0, bytes.length);
-        return EndPoint.getByAddress(bytes);
+        return InetAddress.getByAddress(bytes);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Oct 21 18:26:02 2009
@@ -20,6 +20,7 @@
 
 import java.io.*;
 import java.net.SocketException;
+import java.net.InetAddress;
 
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.utils.LogUtil;
@@ -32,10 +33,10 @@
     private String file_;
     private long startPosition_;
     private long total_;
-    private EndPoint from_;
-    private EndPoint to_;
+    private InetAddress from_;
+    private InetAddress to_;
     
-    FileStreamTask(String file, long startPosition, long total, EndPoint from, EndPoint to)
+    FileStreamTask(String file, long startPosition, long total, InetAddress from, InetAddress to)
     {
         file_ = file;
         startPosition_ = startPosition;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Wed Oct 21 18:26:02 2009
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.net.InetAddress;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.utils.GuidGenerator;
@@ -43,13 +44,13 @@
         return serializer_;
     }
 
-    private EndPoint from_;
+    private InetAddress from_;
     private String type_;
     private String verb_;
     private String messageId_;
     protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
     
-    Header(String id, EndPoint from, String messageType, String verb)
+    Header(String id, InetAddress from, String messageType, String verb)
     {
         messageId_ = id;
         from_ = from;
@@ -57,13 +58,13 @@
         verb_ = verb;        
     }
     
-    Header(String id, EndPoint from, String messageType, String verb, Map<String, byte[]> details)
+    Header(String id, InetAddress from, String messageType, String verb, Map<String, byte[]> details)
     {
         this(id, from, messageType, verb);
         details_ = details;
     }
 
-    Header(EndPoint from, String messageType, String verb)
+    Header(InetAddress from, String messageType, String verb)
     {
         messageId_ = Integer.toString(idGen_.incrementAndGet());
         from_ = from;
@@ -71,7 +72,7 @@
         verb_ = verb;
     }        
 
-    EndPoint getFrom()
+    InetAddress getFrom()
     {
         return from_;
     }
@@ -153,7 +154,7 @@
     public Header deserialize(DataInputStream dis) throws IOException
     {
         String id = dis.readUTF();
-        EndPoint from = CompactEndPointSerializationHelper.deserialize(dis);
+        InetAddress from = CompactEndPointSerializationHelper.deserialize(dis);
         String type = dis.readUTF();
         String verb = dis.readUTF();
         

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java Wed Oct 21 18:26:02 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 import javax.xml.bind.JAXBException;
 
@@ -45,7 +46,7 @@
      * Deregister all verbhandlers corresponding to localEndPoint.
      * @param localEndPoint
      */
-    public void deregisterAllVerbHandlers(EndPoint localEndPoint);
+    public void deregisterAllVerbHandlers(InetAddress localEndPoint);
     
     /**
      * Deregister a verbhandler corresponding to the verb from the
@@ -56,16 +57,16 @@
     
     /**
      * Listen on the specified port.
-     * @param ep EndPoint whose port to listen on.
+     * @param ep InetAddress whose port to listen on.
      * @param isHttp specify if the port is an Http port.     
      */
-    public void listen(EndPoint ep) throws IOException;
+    public void listen(InetAddress ep) throws IOException;
     
     /**
      * Listen on the specified port.
-     * @param ep EndPoint whose port to listen on.     
+     * @param ep InetAddress whose port to listen on.
      */
-    public void listenUDP(EndPoint ep);
+    public void listenUDP(InetAddress ep);
     
     /**
      * Send a message to a given endpoint. 
@@ -74,7 +75,7 @@
      * @return an reference to an IAsyncResult which can be queried for the
      * response
      */
-    public IAsyncResult sendRR(Message message, EndPoint to);
+    public IAsyncResult sendRR(Message message, InetAddress to);
 
     /**
      * Send a message to the given set of endpoints and informs the MessagingService
@@ -85,7 +86,7 @@
      * @param cb callback interface which is used to pass the responses
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb);
+    public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb);
     
     /**
      * Send a message to a given endpoint. This method specifies a callback
@@ -97,7 +98,7 @@
      *           suggest that a timeout occurred to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message message, EndPoint to, IAsyncCallback cb);
+    public String sendRR(Message message, InetAddress to, IAsyncCallback cb);
 
     /**
      * Send a message to a given endpoint. The ith element in the <code>messages</code>
@@ -112,7 +113,7 @@
      *           suggest that a timeout occured to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+    public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb);
 
     /**
      * Send a message to a given endpoint. This method adheres to the fire and forget
@@ -120,7 +121,7 @@
      * @param message messages to be sent.
      * @param to endpoint to which the message needs to be sent
      */
-    public void sendOneWay(Message message, EndPoint to);
+    public void sendOneWay(Message message, InetAddress to);
         
     /**
      * Send a message to a given endpoint. This method adheres to the fire and forget
@@ -128,7 +129,7 @@
      * @param message messages to be sent.
      * @param to endpoint to which the message needs to be sent
      */
-    public void sendUdpOneWay(Message message, EndPoint to);
+    public void sendUdpOneWay(Message message, InetAddress to);
     
     /**
      * Stream a file from source to destination. This is highly optimized
@@ -138,7 +139,7 @@
      * @param total number of bytes to stream
      * @param to endpoint to which we need to stream the file.
     */
-    public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to);
+    public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to);
 
     /**
      * This method returns the verb handler associated with the registered

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Oct 21 18:26:02 2009
@@ -22,6 +22,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Map;
+import java.net.InetAddress;
 
 import org.apache.cassandra.io.ICompactSerializer;
 
@@ -52,7 +53,7 @@
         body_ = body;
     }
 
-    public Message(EndPoint from, String messageType, String verb, byte[] body)
+    public Message(InetAddress from, String messageType, String verb, byte[] body)
     {
         this(new Header(from, messageType, verb), body);
     }    
@@ -92,7 +93,7 @@
         return body_;
     }
 
-    public EndPoint getFrom()
+    public InetAddress getFrom()
     {
         return header_.getFrom();
     }
@@ -117,7 +118,7 @@
         header_.setMessageId(id);
     }    
 
-    public Message getReply(EndPoint from, byte[] args)
+    public Message getReply(InetAddress from, byte[] args)
     {
         Header header = new Header(getMessageId(), from, MessagingService.responseStage_, MessagingService.responseVerbHandler_);
         return new Message(header, args);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Oct 21 18:26:02 2009
@@ -28,6 +28,8 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
@@ -63,10 +65,10 @@
     private static ICachetable<String, IAsyncResult> taskCompletionMap_;
     
     /* Manages the table of endpoints it is listening on */
-    private static Set<EndPoint> endPoints_;
+    private static Set<InetAddress> endPoints_;
     
     /* List of sockets we are listening on */
-    private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
+    private static Map<InetAddress, SelectionKey> listenSockets_ = new HashMap<InetAddress, SelectionKey>();
     
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<String, IVerbHandler> verbHandlers_;
@@ -128,7 +130,7 @@
             reservedVerbs_.put(verbs.toString(), verbs.toString());
         }
         verbHandlers_ = new HashMap<String, IVerbHandler>();        
-        endPoints_ = new HashSet<EndPoint>();
+        endPoints_ = new HashSet<InetAddress>();
         /*
          * Leave callbacks in the cachetable long enough that any related messages will arrive
          * before the callback is evicted from the table. The concurrency level is set at 128
@@ -180,11 +182,11 @@
         return result;
     }
     
-    public void listen(EndPoint localEp) throws IOException
+    public void listen(InetAddress localEp) throws IOException
     {        
         ServerSocketChannel serverChannel = ServerSocketChannel.open();
         ServerSocket ss = serverChannel.socket();            
-        ss.bind(localEp.getInetAddress());
+        ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
         serverChannel.configureBlocking(false);
         
         SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
@@ -194,14 +196,14 @@
         listenSockets_.put(localEp, key);             
     }
     
-    public void listenUDP(EndPoint localEp)
+    public void listenUDP(InetAddress localEp)
     {
         UdpConnection connection = new UdpConnection();
         if (logger_.isDebugEnabled())
           logger_.debug("Starting to listen on " + localEp);
         try
         {
-            connection.init(localEp.getPort());
+            connection.init(localEp);
             endPoints_.add(localEp);     
         }
         catch ( IOException e )
@@ -210,7 +212,7 @@
         }
     }
     
-    public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
+    public static TcpConnectionManager getConnectionPool(InetAddress from, InetAddress to)
     {
         String key = from + ":" + to;
         TcpConnectionManager cp = poolTable_.get(key);
@@ -236,7 +238,7 @@
         return cp;
     }
 
-    public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
+    public static TcpConnection getConnection(InetAddress from, InetAddress to) throws IOException
     {
         return getConnectionPool(from, to).getConnection();
     }
@@ -255,7 +257,7 @@
     	verbHandlers_.put(type, verbHandler);
     }
     
-    public void deregisterAllVerbHandlers(EndPoint localEndPoint)
+    public void deregisterAllVerbHandlers(InetAddress localEndPoint)
     {
         Iterator keys = verbHandlers_.keySet().iterator();
         String key = null;
@@ -283,7 +285,7 @@
         return handler;
     }
 
-    public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
+    public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
     {
         String messageId = message.getMessageId();                        
         callbackMap_.put(messageId, cb);
@@ -294,7 +296,7 @@
         return messageId;
     }
     
-    public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
+    public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
     {        
         String messageId = message.getMessageId();
         callbackMap_.put(messageId, cb);
@@ -302,7 +304,7 @@
         return messageId;
     }
 
-    public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
+    public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
     {
         if ( messages.length != to.length )
         {
@@ -321,7 +323,7 @@
     /*
         Use this version for fire and forget style messaging.
     */
-    public void sendOneWay(Message message, EndPoint to)
+    public void sendOneWay(Message message, InetAddress to)
     {        
         // do local deliveries        
         if ( message.getFrom().equals(to) )
@@ -364,7 +366,7 @@
         }
     }
     
-    public IAsyncResult sendRR(Message message, EndPoint to)
+    public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
         taskCompletionMap_.put(message.getMessageId(), iar);
@@ -372,7 +374,7 @@
         return iar;
     }
     
-    public void sendUdpOneWay(Message message, EndPoint to)
+    public void sendUdpOneWay(Message message, InetAddress to)
     {
         if (message.getFrom().equals(to)) {
             MessagingService.receive(message);
@@ -397,7 +399,7 @@
         }
     }
     
-    public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
+    public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
     {
         isStreaming_.set(true);
         /* Streaming asynchronously on streamExector_ threads. */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Wed Oct 21 18:26:02 2009
@@ -29,6 +29,8 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.io.FastSerializer;
@@ -51,8 +53,8 @@
     private TcpReader tcpReader_;    
     private ReadWorkItem readWork_ = new ReadWorkItem(); 
     private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
-    private EndPoint localEp_;
-    private EndPoint remoteEp_;
+    private InetAddress localEp_;
+    private InetAddress remoteEp_;
     boolean inUse_ = false;
          
     /* 
@@ -67,7 +69,7 @@
     private Condition condition_;
     
     // used from getConnection - outgoing
-    TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+    TcpConnection(TcpConnectionManager pool, InetAddress from, InetAddress to) throws IOException
     {          
         socketChannel_ = SocketChannel.open();            
         socketChannel_.configureBlocking(false);        
@@ -75,8 +77,8 @@
         
         localEp_ = from;
         remoteEp_ = to;
-        
-        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+
+        if (!socketChannel_.connect(new InetSocketAddress(remoteEp_, DatabaseDescriptor.getStoragePort())))
         {
             key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
         }
@@ -89,7 +91,7 @@
     /*
      * Used for streaming purposes has no pooling semantics.
     */
-    TcpConnection(EndPoint from, EndPoint to) throws IOException
+    TcpConnection(InetAddress from, InetAddress to) throws IOException
     {
         socketChannel_ = SocketChannel.open();               
         socketChannel_.configureBlocking(false);       
@@ -97,7 +99,7 @@
         localEp_ = from;
         remoteEp_ = to;
         
-        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+        if (!socketChannel_.connect(new InetSocketAddress(remoteEp_, DatabaseDescriptor.getStoragePort())))
         {
             key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
         }
@@ -114,7 +116,7 @@
      * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
      * Accept the connection and then register interest for reads.
     */
-    static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    static void acceptConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
     {
         TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
         tcpConnection.registerReadInterest();
@@ -126,7 +128,7 @@
     }
     
     // used for incoming connections
-    TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    TcpConnection(SocketChannel socketChannel, InetAddress localEp, boolean isIncoming) throws IOException
     {       
         socketChannel_ = socketChannel;
         socketChannel_.configureBlocking(false);                           
@@ -134,17 +136,17 @@
         localEp_ = localEp;
     }
     
-    EndPoint getLocalEp()
+    InetAddress getLocalEp()
     {
         return localEp_;
     }
     
-    public void setLocalEp(EndPoint localEp)
+    public void setLocalEp(InetAddress localEp)
     {
         localEp_ = localEp;
     }
 
-    public EndPoint getEndPoint() 
+    public InetAddress getEndPoint()
     {
         return remoteEp_;
     }
@@ -453,7 +455,7 @@
                         /* first message received */
                         if (remoteEp_ == null)
                         {
-                            remoteEp_ = new EndPoint(socketChannel_.socket().getInetAddress().getHostAddress(), DatabaseDescriptor.getStoragePort());
+                            remoteEp_ = socketChannel_.socket().getInetAddress();
                             // put connection into pool if possible
                             pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);                            
                             pool_.addToPool(TcpConnection.this);                            

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java Wed Oct 21 18:26:02 2009
@@ -28,9 +28,9 @@
 public class TcpConnectionHandler extends SelectionKeyHandler
 {
     private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
-    EndPoint localEp_;    
+    InetAddress localEp_;
     
-    public TcpConnectionHandler(EndPoint localEp) 
+    public TcpConnectionHandler(InetAddress localEp)
     {
         localEp_ = localEp;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Wed Oct 21 18:26:02 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.locks.*;
+import java.net.InetAddress;
 
 import org.apache.log4j.Logger;
 
@@ -28,15 +29,15 @@
 {
     private Lock lock_ = new ReentrantLock();
     private List<TcpConnection> allConnections_;
-    private EndPoint localEp_;
-    private EndPoint remoteEp_;
+    private InetAddress localEp_;
+    private InetAddress remoteEp_;
     private int maxSize_;
 
     private int inUse_;
 
     // TODO! this whole thing is a giant no-op, since "contains" only relies on TcpConnection.equals, which
     // is true for any (local, remote) pairs.  So there is only ever at most one TcpConnection per Manager!
-    TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+    TcpConnectionManager(int initialSize, int growthFactor, int maxSize, InetAddress localEp, InetAddress remoteEp)
     {
         maxSize_ = maxSize;
         localEp_ = localEp;
@@ -187,12 +188,12 @@
         }
     }
 
-    EndPoint getLocalEndPoint()
+    InetAddress getLocalEndPoint()
     {
         return localEp_;
     }
 
-    EndPoint getRemoteEndPoint()
+    InetAddress getRemoteEndPoint()
     {
         return remoteEp_;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java Wed Oct 21 18:26:02 2009
@@ -19,23 +19,22 @@
 package org.apache.cassandra.net;
 
 import java.net.SocketAddress;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.*;
 import java.nio.channels.*;
-import java.util.*;
-import java.util.concurrent.*;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.utils.BasicUtilities;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
+
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 public class UdpConnection extends SelectionKeyHandler
 {
@@ -44,8 +43,7 @@
     private static final int protocol_ = 0xBADBEEF;
     
     private DatagramChannel socketChannel_;
-    private SelectionKey key_;    
-    private EndPoint localEndPoint_;
+    private SelectionKey key_;
     
     public void init() throws IOException
     {
@@ -54,17 +52,16 @@
         socketChannel_.configureBlocking(false);        
     }
     
-    public void init(int port) throws IOException
+    public void init(InetAddress localEp) throws IOException
     {
-        localEndPoint_ = new EndPoint(FBUtilities.getHostAddress(), port);
         socketChannel_ = DatagramChannel.open();
-        socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+        socketChannel_.socket().bind(new InetSocketAddress(localEp, DatabaseDescriptor.getControlPort()));
         socketChannel_.socket().setReuseAddress(true);
         socketChannel_.configureBlocking(false);        
         key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
     }
     
-    public boolean write(Message message, EndPoint to) throws IOException
+    public boolean write(Message message, InetAddress to) throws IOException
     {
         boolean bVal = true;                       
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -81,7 +78,7 @@
             buffer.put(data);
             buffer.flip();
             
-            int n  = socketChannel_.send(buffer, to.getInetAddress());
+            int n  = socketChannel_.send(buffer, new InetSocketAddress(to, DatabaseDescriptor.getControlPort()));
             if ( n == 0 )
             {
                 bVal = false;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java Wed Oct 21 18:26:02 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.net.io;
 
 import java.net.InetSocketAddress;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
@@ -43,9 +44,8 @@
         super(stream); 
         SocketChannel socketChannel = stream.getStream();
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        String remoteHost = remoteAddress.getAddress().getHostAddress();        
-        streamContext_ = StreamContextManager.getStreamContext(remoteHost);   
-        streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+        streamContext_ = StreamContextManager.getStreamContext(remoteAddress.getAddress());
+        streamStatus_ = StreamContextManager.getStreamStatus(remoteAddress.getAddress());
     }
     
     private void createFileChannel() throws IOException
@@ -63,7 +63,6 @@
     {        
         SocketChannel socketChannel = stream_.getStream();
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        String remoteHostIp = remoteAddress.getAddress().getHostAddress();
         createFileChannel();
         if ( streamContext_ != null )
         {  
@@ -77,7 +76,7 @@
             {
                 /* Ask the source node to re-stream this file. */
                 streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);                
-                handleStreamCompletion(remoteHostIp);
+                handleStreamCompletion(remoteAddress.getAddress());
                 /* Delete the orphaned file. */
                 File file = new File(streamContext_.getTargetFile());
                 file.delete();
@@ -87,7 +86,7 @@
             {       
                 if (logger_.isDebugEnabled())
                     logger_.debug("Removing stream context " + streamContext_);                 
-                handleStreamCompletion(remoteHostIp);                              
+                handleStreamCompletion(remoteAddress.getAddress());                              
                 bytesRead_ = 0L;
                 fc_.close();
                 morphState();
@@ -97,7 +96,7 @@
         return new byte[0];
     }
     
-    private void handleStreamCompletion(String remoteHost) throws IOException
+    private void handleStreamCompletion(InetAddress remoteHost) throws IOException
     {
         /* 
          * Streaming is complete. If all the data that has to be received inform the sender via 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java Wed Oct 21 18:26:02 2009
@@ -20,7 +20,7 @@
 
 import java.io.IOException;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 public interface IStreamComplete
 {
@@ -28,5 +28,5 @@
      * This callback if registered with the StreamContextManager is 
      * called when the stream from a host is completely handled. 
     */
-    public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+    public void onStreamCompletion(InetAddress from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
 }