You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/21 20:26:04 UTC

svn commit: r828130 [1/3] - in /incubator/cassandra/trunk: contrib/bmt_example/ contrib/property_snitch/src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/...

Author: jbellis
Date: Wed Oct 21 18:26:02 2009
New Revision: 828130

URL: http://svn.apache.org/viewvc?rev=828130&view=rev
Log:
convert EndPoint to InetAddress (removes MembershipCleaner, the code to drop a node from the cluster entirely.  we're probably going to want to resurrect that at some point)
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-498

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ConnectionStatistics.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/EndPoint.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
Modified:
    incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
    incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java

Modified: incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Wed Oct 21 18:26:02 2009
@@ -59,7 +59,9 @@
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.BigIntegerToken;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.SelectorManager;
@@ -113,7 +115,16 @@
             for (String token : this.tokens)
             {
                 String[] values = token.split(":");
-                StorageService.instance().updateTokenMetadata(new BigIntegerToken(new BigInteger(values[0])),new EndPoint(values[1], 7000));
+                InetAddress address;
+                try
+                {
+                    address = InetAddress.getByName(values[1]);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                StorageService.instance().updateTokenMetadataUnsafe(new BigIntegerToken(new BigInteger(values[0])), address);
             }
         }
         public void close()
@@ -160,10 +171,10 @@
 
             /* Get serialized message to send to cluster */
             message = createMessage(Keyspace, key.toString(), CFName, columnFamilies);
-            for (EndPoint endpoint: StorageService.instance().getReadStorageEndPoints(key.toString()))
+            for (InetAddress endpoint: StorageService.instance().getReadStorageEndPoints(key.toString()))
             {
                 /* Send message to end point */
-                MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
+                MessagingService.instance().sendOneWay(message, endpoint);
             }
             
             output.collect(key, new Text(" inserted into Cassandra node(s)"));
@@ -234,7 +245,7 @@
                 throw new RuntimeException(e);
             }
         }
-        rm = new RowMutation(Keyspace,StorageService.getPartitioner().decorateKey(Key));
+        rm = new RowMutation(Keyspace, Key);
         rm.add(baseColumnFamily);
 
         try

Modified: incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java (original)
+++ incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
 import javax.management.ObjectName;
 
 import org.apache.cassandra.locator.EndPointSnitch;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
 
@@ -84,7 +84,7 @@
      * 
      * @return a array of string with the first index being the data center and the second being the rack
      */
-    public String[] getEndPointInfo(EndPoint endPoint) {
+    public String[] getEndPointInfo(InetAddress endPoint) {
         String key = endPoint.toString();
         String value = hostProperties.getProperty(key);
         if (value == null)
@@ -107,7 +107,7 @@
      * @param endPoint the endPoint to process
      * @return string of data center
      */
-    public String getDataCenterForEndPoint(EndPoint endPoint) {
+    public String getDataCenterForEndPoint(InetAddress endPoint) {
         return getEndPointInfo(endPoint)[0];
     }
 
@@ -118,12 +118,12 @@
      * 
      * @return string of rack
      */
-    public String getRackForEndPoint(EndPoint endPoint) {
+    public String getRackForEndPoint(InetAddress endPoint) {
         return getEndPointInfo(endPoint)[1];
     }
 
     @Override
-    public boolean isInSameDataCenter(EndPoint host, EndPoint host2)
+    public boolean isInSameDataCenter(InetAddress host, InetAddress host2)
             throws UnknownHostException {
         if (runInBaseMode) 
         {
@@ -133,7 +133,7 @@
     }
 
     @Override
-    public boolean isOnSameRack(EndPoint host, EndPoint host2)
+    public boolean isOnSameRack(InetAddress host, InetAddress host2)
             throws UnknownHostException {
         if (runInBaseMode) 
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,9 @@
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.apache.cassandra.service.Cassandra;
 import org.apache.cassandra.service.CassandraServer;
 import org.apache.cassandra.service.StorageService;
@@ -52,7 +54,10 @@
 
     public RingCache()
     {
-        seeds_ = DatabaseDescriptor.getSeeds();
+        for (InetAddress seed : DatabaseDescriptor.getSeeds())
+        {
+            seeds_.add(seed.getHostAddress());
+        }
         refreshEndPointMap();
     }
 
@@ -69,14 +74,21 @@
 
                 Map<String,String> tokenToHostMap = (Map<String,String>) new JSONTokener(client.get_string_property(CassandraServer.TOKEN_MAP)).nextValue();
                 
-                HashMap<Token, EndPoint> tokenEndpointMap = new HashMap<Token, EndPoint>();
-                Map<EndPoint, Token> endpointTokenMap = new HashMap<EndPoint, Token>();
+                HashMap<Token, InetAddress> tokenEndpointMap = new HashMap<Token, InetAddress>();
+                Map<InetAddress, Token> endpointTokenMap = new HashMap<InetAddress, Token>();
                 for (Map.Entry<String,String> entry : tokenToHostMap.entrySet())
                 {
                     Token token = StorageService.getPartitioner().getTokenFactory().fromString(entry.getKey());
                     String host = entry.getValue();
-                    tokenEndpointMap.put(token, new EndPoint(host, port_));
-                    endpointTokenMap.put(new EndPoint(host, port_), token);
+                    try
+                    {
+                        tokenEndpointMap.put(token, InetAddress.getByName(host));
+                        endpointTokenMap.put(InetAddress.getByName(host), token);
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new AssertionError(e); // host strings are IPs
+                    }
                 }
 
                 TokenMetadata tokenMetadata = new TokenMetadata(tokenEndpointMap, endpointTokenMap, null);
@@ -100,7 +112,7 @@
         }
     }
 
-    public EndPoint[] getEndPoint(String key)
+    public InetAddress[] getEndPoint(String key)
     {
         return nodePicker_.getReadStorageEndPoints(partitioner_.getToken(key));
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Oct 21 18:26:02 2009
@@ -35,6 +35,7 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.*;
+import java.net.InetAddress;
 
 public class DatabaseDescriptor
 {
@@ -52,12 +53,12 @@
     private static int controlPort_ = 7001;
     private static int thriftPort_ = 9160;
     private static boolean thriftFramed_ = false;
-    private static String listenAddress_; // leave null so we can fall through to getLocalHost
-    private static String thriftAddress_;
+    private static InetAddress listenAddress_; // leave null so we can fall through to getLocalHost
+    private static InetAddress thriftAddress_;
     private static String clusterName_ = "Test";
     private static int replicationFactor_ = 3;
     private static long rpcTimeoutInMillis_ = 2000;
-    private static Set<String> seeds_ = new HashSet<String>();
+    private static Set<InetAddress> seeds_ = new HashSet<InetAddress>();
     /* Keeps the list of data file directories */
     private static String[] dataFileDirectories_;
     /* Current index into the above list of directories */
@@ -276,12 +277,12 @@
             /* Local IP or hostname to bind services to */
             String listenAddress = xmlUtils.getNodeValue("/Storage/ListenAddress");
             if ( listenAddress != null)
-                listenAddress_ = listenAddress;
+                listenAddress_ = InetAddress.getByName(listenAddress);
             
             /* Local IP or hostname to bind thrift server to */
             String thriftAddress = xmlUtils.getNodeValue("/Storage/ThriftAddress");
             if ( thriftAddress != null )
-                thriftAddress_ = thriftAddress;
+                thriftAddress_ = InetAddress.getByName(thriftAddress);
             
             /* UDP port for control messages */
             port = xmlUtils.getNodeValue("/Storage/ControlPort");
@@ -538,7 +539,7 @@
             }
             for( int i = 0; i < seeds.length; ++i )
             {
-                seeds_.add( seeds[i] );
+                seeds_.add(InetAddress.getByName(seeds[i]));
             }
         }
         catch (ConfigurationException e)
@@ -843,7 +844,7 @@
         logFileDirectory_ = logLocation;
     }
 
-    public static Set<String> getSeeds()
+    public static Set<InetAddress> getSeeds()
     {
         return seeds_;
     }
@@ -918,12 +919,12 @@
         }
     }
 
-    public static String getListenAddress()
+    public static InetAddress getListenAddress()
     {
         return listenAddress_;
     }
     
-    public static String getThriftAddress()
+    public static InetAddress getThriftAddress()
     {
         return thriftAddress_;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Oct 21 18:26:02 2009
@@ -36,7 +36,7 @@
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.*;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
@@ -272,7 +272,7 @@
      * This method forces a compaction of the SSTables on disk. We wait
      * for the process to complete by waiting on a future pointer.
     */
-    List<SSTableReader> forceAntiCompaction(List<Range> ranges, EndPoint target)
+    List<SSTableReader> forceAntiCompaction(List<Range> ranges, InetAddress target)
     {
         assert ranges != null;
         Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target);
@@ -704,7 +704,7 @@
         return maxFile;
     }
 
-    List<SSTableReader> doAntiCompaction(List<Range> ranges, EndPoint target) throws IOException
+    List<SSTableReader> doAntiCompaction(List<Range> ranges, InetAddress target) throws IOException
     {
         return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target);
     }
@@ -757,7 +757,7 @@
      * @return
      * @throws IOException
      */
-    List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> sstables, Collection<Range> ranges, EndPoint target) throws IOException
+    List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target) throws IOException
     {
         logger_.info("AntiCompacting [" + StringUtils.join(sstables, ",") + "]");
         // Calculate the expected compacted filesize

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Oct 21 18:26:02 2009
@@ -34,7 +34,7 @@
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 public class CompactionManager implements CompactionManagerMBean
 {
@@ -75,9 +75,9 @@
     {
         private ColumnFamilyStore columnFamilyStore_;
         private List<Range> ranges_;
-        private EndPoint target_;
+        private InetAddress target_;
 
-        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target)
+        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, InetAddress target)
         {
             columnFamilyStore_ = columnFamilyStore;
             ranges_ = ranges;
@@ -187,7 +187,7 @@
         compactor_.submit(new CleanupCompactor(columnFamilyStore));
     }
 
-    public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target)
+    public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, InetAddress target)
     {
         return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target) );
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -26,8 +26,8 @@
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
 
 import org.apache.log4j.Logger;
 
@@ -52,7 +52,7 @@
             {
                 dos.writeUTF(sstable.getFilename());
             }
-            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bos.toByteArray());
+            Message response = message.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
             MessagingService.instance().sendOneWay(response, message.getFrom());
         }
         catch (IOException ex)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Oct 21 18:26:02 2009
@@ -21,6 +21,7 @@
 import java.util.Collection;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
@@ -32,7 +33,7 @@
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
@@ -100,9 +101,8 @@
         return instance_;
     }
 
-    private static boolean sendMessage(String endpointAddress, String tableName, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
+    private static boolean sendMessage(InetAddress endPoint, String tableName, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
     {
-        EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
         if (!FailureDetector.instance().isAlive(endPoint))
         {
             return false;
@@ -113,7 +113,7 @@
         RowMutation rm = new RowMutation(tableName, row);
         Message message = rm.makeRowMutationMessage();
         QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
-        MessagingService.instance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+        MessagingService.instance().sendRR(message, new InetAddress[]{ endPoint }, quorumResponseHandler);
 
         return quorumResponseHandler.get();
     }
@@ -161,8 +161,7 @@
                 int deleted = 0;
                 for (IColumn endpoint : endpoints)
                 {
-                    String endpointStr = new String(endpoint.name(), "UTF-8");
-                    if (sendMessage(endpointStr, tableName, keyStr))
+                    if (sendMessage(InetAddress.getByAddress(endpoint.name()), tableName, keyStr))
                     {
                         deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
                         deleted++;
@@ -181,12 +180,12 @@
           logger_.debug("Finished deliverAllHints");
     }
 
-    private static void deliverHintsToEndpoint(EndPoint endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+    private static void deliverHintsToEndpoint(InetAddress endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Started hinted handoff for endPoint " + endPoint);
 
-        String targetEPBytes = endPoint.getHost();
+        byte[] targetEPBytes = endPoint.getAddress();
         // 1. Scan through all the keys that we need to handoff
         // 2. For each key read the list of recipients if the endpoint matches send
         // 3. Delete that recipient from the key if write was successful
@@ -206,7 +205,7 @@
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
                 for (IColumn hintEndPoint : endpoints)
                 {
-                    if (new String(hintEndPoint.name(), "UTF-8").equals(targetEPBytes) && sendMessage(endPoint.getHost(), null, keyStr))
+                    if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, null, keyStr))
                     {
                         if (endpoints.size() == 1)
                         {
@@ -256,7 +255,7 @@
      * When we learn that some endpoint is back up we deliver the data
      * to him via an event driven mechanism.
     */
-    public void deliverHints(final EndPoint to)
+    public void deliverHints(final InetAddress to)
     {
         Runnable r = new Runnable()
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Wed Oct 21 18:26:02 2009
@@ -22,16 +22,13 @@
 import java.io.IOException;
 import java.io.DataInputStream;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Collections;
-
-import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class RangeCommand
 {
@@ -56,7 +53,7 @@
     {
         DataOutputBuffer dob = new DataOutputBuffer();
         serializer.serialize(this, dob);
-        return new Message(StorageService.getLocalStorageEndPoint(),
+        return new Message(FBUtilities.getLocalAddress(),
                            StorageService.readStage_,
                            StorageService.rangeVerbHandler_,
                            Arrays.copyOf(dob.getData(), dob.getLength()));

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,7 @@
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class RangeReply
 {
@@ -49,7 +49,7 @@
             dob.writeUTF(key);
         }
         byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
-        return originalMessage.getReply(StorageService.getLocalStorageEndPoint(), data);
+        return originalMessage.getReply(FBUtilities.getLocalAddress(), data);
     }
 
     @Override

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Oct 21 18:26:02 2009
@@ -30,6 +30,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 public abstract class ReadCommand
@@ -52,7 +53,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         ReadCommand.serializer().serialize(this, dos);
-        return new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
     }
 
     public final QueryPath queryPath;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Wed Oct 21 18:26:02 2009
@@ -22,10 +22,12 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.apache.commons.lang.ArrayUtils;
 
 
@@ -53,7 +55,7 @@
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         ReadResponse.serializer().serialize(readResponse, dos);
-        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());         
+        Message message = new Message(FBUtilities.getLocalAddress(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());
         return message;
     }
 	

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -24,7 +24,7 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -97,7 +97,7 @@
             byte[] bytes = new byte[readCtx.bufOut_.getLength()];
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
 
-            Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
+            Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
             if (logger_.isDebugEnabled())
               logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
             MessagingService.instance().sendOneWay(response, message.getFrom());
@@ -116,9 +116,9 @@
     
     private void doReadRepair(Row row, ReadCommand readCommand)
     {
-        List<EndPoint> endpoints = StorageService.instance().getLiveReadStorageEndPoints(readCommand.key);
-        /* Remove the local storage endpoint from the list. */ 
-        endpoints.remove( StorageService.getLocalStorageEndPoint() );
+        List<InetAddress> endpoints = StorageService.instance().getLiveReadStorageEndPoints(readCommand.key);
+        /* Remove the local storage endpoint from the list. */
+        endpoints.remove(FBUtilities.getLocalAddress());
             
         if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
             StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Oct 21 18:26:02 2009
@@ -36,7 +36,7 @@
 
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.utils.FBUtilities;
@@ -105,9 +105,9 @@
         return modifications_.values();
     }
 
-    void addHints(String key, String host) throws IOException
+    void addHints(String key, byte[] host) throws IOException
     {
-        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
+        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host);
         add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
     }
 
@@ -224,9 +224,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        EndPoint local = StorageService.getLocalStorageEndPoint();
-        EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
-        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
     }
 
     public static RowMutation getRowMutation(String table, String key, Map<String, List<ColumnOrSuperColumn>> cfmap)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Wed Oct 21 18:26:02 2009
@@ -27,7 +27,6 @@
 import javax.xml.bind.annotation.XmlElement;
 
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -52,9 +51,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         RowMutationMessage.serializer().serialize(this, dos);
-        EndPoint local = StorageService.getLocalStorageEndPoint();
-        EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
-        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());         
+        return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
     }
     
     @XmlElement(name="RowMutation")

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -21,7 +21,7 @@
 import java.io.*;
 
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 
@@ -49,12 +49,12 @@
             byte[] hintedBytes = message.getHeader(RowMutation.HINT);
             if ( hintedBytes != null && hintedBytes.length > 0 )
             {
-            	EndPoint hint = EndPoint.getByAddress(hintedBytes);
+            	InetAddress hint = InetAddress.getByAddress(hintedBytes);
                 if (logger_.isDebugEnabled())
                   logger_.debug("Adding hint for " + hint);
                 /* add necessary hints to this mutation */
                 RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable());
-                hintedMutation.addHints(rm.key(), hint.getHost());
+                hintedMutation.addHints(rm.key(), hintedBytes);
                 hintedMutation.apply();
             }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Oct 21 18:26:02 2009
@@ -32,7 +32,7 @@
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 public class SystemTable
 {
@@ -60,11 +60,11 @@
     /**
      * Record token being used by another node
      */
-    public static synchronized void updateToken(EndPoint ep, Token token) throws IOException
+    public static synchronized void updateToken(InetAddress ep, Token token) throws IOException
     {
         IPartitioner p = StorageService.getPartitioner();
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
-        cf.addColumn(new Column(ep.getHost().getBytes("UTF-8"), p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
+        cf.addColumn(new Column(ep.getAddress(), p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
         rm.add(cf);
         rm.apply();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.SSTableWriter;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.io.IStreamComplete;
@@ -301,7 +301,7 @@
      * do a complete compaction since we can figure out based on the ranges
      * whether the files need to be split.
     */
-    public List<SSTableReader> forceAntiCompaction(List<Range> ranges, EndPoint target)
+    public List<SSTableReader> forceAntiCompaction(List<Range> ranges, InetAddress target)
     {
         List<SSTableReader> allResults = new ArrayList<SSTableReader>();
         Set<String> columnFamilies = tableMetadata_.getColumnFamilies();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Wed Oct 21 18:26:02 2009
@@ -22,14 +22,10 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlElement;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 /*
@@ -51,7 +47,7 @@
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         WriteResponse.serializer().serialize(writeResponseMessage, dos);
-        return original.getReply(StorageService.getLocalStorageEndPoint(), bos.toByteArray());
+        return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
     }
 
 	private final String table_;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Oct 21 18:26:02 2009
@@ -31,13 +31,13 @@
  import java.io.IOException;
  import java.io.UnsupportedEncodingException;
  import java.io.File;
+ import java.net.InetAddress;
 
  import org.apache.log4j.Logger;
 
  import org.apache.commons.lang.ArrayUtils;
 
  import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.net.io.StreamContextManager;
  import org.apache.cassandra.net.io.IStreamComplete;
@@ -77,12 +77,12 @@
     private static final ExecutorService bootstrapExecutor_ = new DebuggableThreadPoolExecutor("BOOT-STRAPPER");
 
     /* endpoints that need to be bootstrapped */
-    protected EndPoint[] targets_ = new EndPoint[0];
+    protected InetAddress[] targets_ = new InetAddress[0];
     /* tokens of the nodes being bootstrapped. */
     protected final Token[] tokens_;
     protected TokenMetadata tokenMetadata_ = null;
 
-    public BootStrapper(EndPoint[] target, Token... token)
+    public BootStrapper(InetAddress[] target, Token... token)
     {
         targets_ = target;
         tokens_ = token;
@@ -116,7 +116,7 @@
     Map<Range, List<BootstrapSourceTarget>> getRangesWithSourceTarget()
     {
         /* copy the token to endpoint map */
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
         /* remove the tokens associated with the endpoints being bootstrapped */                
         for (Token token : tokens_)
         {
@@ -133,9 +133,9 @@
         */                
         Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);                                                      
         /* Calculate the list of nodes that handle the old ranges */
-        Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
+        Map<Range, List<InetAddress>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
         /* Mapping of split ranges to the list of endpoints responsible for the range */                
-        Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();                                
+        Map<Range, List<InetAddress>> replicasForSplitRanges = new HashMap<Range, List<InetAddress>>();
         Set<Range> rangesSplit = splitRanges.keySet();                
         for ( Range splitRange : rangesSplit )
         {
@@ -151,11 +151,11 @@
         for ( Range splitRange : rangesSplit )
         {
             List<Range> subRanges = splitRanges.get(splitRange);
-            List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+            List<InetAddress> replicas = replicasForSplitRanges.get(splitRange);
             for ( Range subRange : subRanges )
             {
                 /* Make sure we clone or else we are hammered. */
-                oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+                oldRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
             }
         }                
         
@@ -166,15 +166,15 @@
         if (logger_.isDebugEnabled())
           logger_.debug("Total number of new ranges " + newRanges.length);
         /* Calculate the list of nodes that handle the new ranges */
-        Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
+        Map<Range, List<InetAddress>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
         /* Calculate ranges that need to be sent and from whom to where */
         Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
         return rangesWithSourceTarget;
     }
 
-    private static Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
+    private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
     {
-        Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+        Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
         BootstrapTokenCallback btc = new BootstrapTokenCallback();
         MessagingService.instance().sendRR(message, maxEndpoint, btc);
         return btc.getToken();
@@ -193,8 +193,8 @@
         if (DatabaseDescriptor.getInitialToken() == null)
         {
             double maxLoad = 0;
-            EndPoint maxEndpoint = null;
-            for (Map.Entry<EndPoint,Double> entry : slb.getLoadInfo().entrySet())
+            InetAddress maxEndpoint = null;
+            for (Map.Entry<InetAddress,Double> entry : slb.getLoadInfo().entrySet())
             {
                 if (maxEndpoint == null || entry.getValue() > maxLoad)
                 {
@@ -207,16 +207,15 @@
                 throw new RuntimeException("No bootstrap sources found");
             }
 
-            if (!maxEndpoint.equals(StorageService.getLocalStorageEndPoint()))
+            if (!maxEndpoint.equals(FBUtilities.getLocalAddress()))
             {
-                EndPoint maxStorageEndpoint = new EndPoint(maxEndpoint.getHost(), DatabaseDescriptor.getStoragePort());
-                Token<?> t = getBootstrapTokenFrom(maxStorageEndpoint);
+                Token<?> t = getBootstrapTokenFrom(maxEndpoint);
                 logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint);
                 ss.updateToken(t);
             }
         }
 
-        BootStrapper bs = new BootStrapper(new EndPoint[] {StorageService.getLocalStorageEndPoint()}, ss.getLocalToken());
+        BootStrapper bs = new BootStrapper(new InetAddress[] { FBUtilities.getLocalAddress() }, ss.getLocalToken());
         bootstrapExecutor_.submit(bs);
         Gossiper.instance().addApplicationState(StorageService.BOOTSTRAP_MODE, new ApplicationState(""));
     }
@@ -231,7 +230,7 @@
             Message response;
             try
             {
-                response = message.getReply(ss.getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
+                response = message.getReply(FBUtilities.getLocalAddress(), tokens.get(1).getBytes("UTF-8"));
             }
             catch (UnsupportedEncodingException e)
             {
@@ -319,14 +318,14 @@
                     if (logger_.isDebugEnabled())
                       logger_.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
                     streamContext.setTargetFile(file);
-                    addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
+                    addStreamContext(message.getFrom(), streamContext, streamStatus);
                 }
 
-                StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new BootstrapCompletionHandler());
+                StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new BootstrapCompletionHandler());
                 /* Send a bootstrap initiation done message to execute on default stage. */
                 if (logger_.isDebugEnabled())
                   logger_.debug("Sending a bootstrap initiate done message ...");
-                Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
+                Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
                 MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
             }
             catch ( IOException ex )
@@ -384,7 +383,7 @@
             return fileNames;
         }
 
-        private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+        private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
         {
             if (logger_.isDebugEnabled())
               logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
@@ -400,7 +399,7 @@
     */
     private static class BootstrapCompletionHandler implements IStreamComplete
     {
-        public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+        public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
         {
             /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
             if (streamContext.getTargetFile().contains("-Data.db"))
@@ -426,16 +425,15 @@
                 }
             }
 
-            EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
             if (logger_.isDebugEnabled())
-              logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
+              logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + host);
             /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
             StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
             Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
-            MessagingService.instance().sendOneWay(message, to);
+            MessagingService.instance().sendOneWay(message, host);
             /* If we're done with everything for this host, remove from bootstrap sources */
-            if (StreamContextManager.isDone(to.getHost()))
-                StorageService.instance().removeBootstrapSource(to);
+            if (StreamContextManager.isDone(host))
+                StorageService.instance().removeBootstrapSource(host);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java Wed Oct 21 18:26:02 2009
@@ -24,12 +24,11 @@
 import java.io.IOException;
 import java.io.Serializable;
 
-import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.io.StreamContextManager;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class BootstrapInitiateMessage implements Serializable
 {
@@ -50,7 +49,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
-        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
+        return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
     }
     
     protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 
 
@@ -50,10 +50,10 @@
         return serializer_;
     }
     
-    protected EndPoint target_;
+    protected InetAddress target_;
     protected List<Range> ranges_;
     
-    BootstrapMetadata(EndPoint target, List<Range> ranges)
+    BootstrapMetadata(InetAddress target, List<Range> ranges)
     {
         target_ = target;
         ranges_ = ranges;
@@ -87,7 +87,7 @@
 
     public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
     {            
-        EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
+        InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
         int size = dis.readInt();
         List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
         for( int i = 0; i < size; ++i )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Wed Oct 21 18:26:02 2009
@@ -26,10 +26,10 @@
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 
-
-/**
+ /**
  * This class encapsulates the message that needs to be sent
  * to nodes that handoff data. The message contains information
  * about the node to be bootstrapped and the ranges with which
@@ -53,7 +53,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
-        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
+        return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
     }        
     
     protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -27,7 +27,7 @@
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -93,7 +93,7 @@
      * locally for each range and then stream them using
      * the Bootstrap protocol to the target endpoint.
     */
-    private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
+    private void doTransfer(InetAddress target, List<Range> ranges) throws IOException
     {
         if ( ranges.size() == 0 )
         {
@@ -146,7 +146,7 @@
      * Stream the files in the bootstrap directory over to the
      * node being bootstrapped.
     */
-    private void doHandoff(EndPoint target, List<String> fileList, String table) throws IOException
+    private void doHandoff(InetAddress target, List<String> fileList, String table) throws IOException
     {
         List<File> filesList = new ArrayList<File>();
         for(String file : fileList)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
 
 package org.apache.cassandra.dht;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 /**
  * This class encapsulates who is the source and the
@@ -26,10 +26,10 @@
  */
 class BootstrapSourceTarget
 {
-    protected EndPoint source_;
-    protected EndPoint target_;
+    protected InetAddress source_;
+    protected InetAddress target_;
     
-    BootstrapSourceTarget(EndPoint source, EndPoint target)
+    BootstrapSourceTarget(InetAddress source, InetAddress target)
     {
         source_ = source;
         target_ = target;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Wed Oct 21 18:26:02 2009
@@ -28,7 +28,7 @@
 
  import org.apache.log4j.Logger;
 
- import org.apache.cassandra.net.EndPoint;
+ import java.net.InetAddress;
  import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -100,7 +100,7 @@
         return splitRanges;
     }
     
-    protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
+    protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<InetAddress>> oldRangeToEndPointMap, Map<Range, List<InetAddress>> newRangeToEndPointMap)
     {
         Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
         /*
@@ -114,12 +114,12 @@
         {
             if (logger_.isDebugEnabled())
               logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
-            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
-            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+            List<InetAddress> oldEndPoints = oldRangeToEndPointMap.get(range);
+            List<InetAddress> newEndPoints = newRangeToEndPointMap.get(range);
             if ( newEndPoints != null )
             {                        
-                List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
-                for ( EndPoint newEndPoint : newEndPoints2 )
+                List<InetAddress> newEndPoints2 = new ArrayList<InetAddress>(newEndPoints);
+                for ( InetAddress newEndPoint : newEndPoints2 )
                 {
                     if ( oldEndPoints.contains(newEndPoint) )
                     {
@@ -137,8 +137,8 @@
         }
         for ( Range range : oldRangeSet )
         {                    
-            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
-            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+            List<InetAddress> oldEndPoints = oldRangeToEndPointMap.get(range);
+            List<InetAddress> newEndPoints = newRangeToEndPointMap.get(range);
             List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
             if ( srcTarget == null )
             {
@@ -146,7 +146,7 @@
                 rangesWithSourceTarget.put(range, srcTarget);
             }
             int i = 0;
-            for ( EndPoint oldEndPoint : oldEndPoints )
+            for ( InetAddress oldEndPoint : oldEndPoints )
             {                        
                 srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
             }
@@ -160,7 +160,7 @@
     */
     protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
     {
-        Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = getWorkMap(rangesWithSourceTarget);
+        Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo = getWorkMap(rangesWithSourceTarget);
         sendMessagesToBootstrapSources(rangeInfo);
     }
     
@@ -168,15 +168,15 @@
      * This method takes the Src -> (Tgt-> List of ranges) maps and retains those entries 
      * that are relevant to bootstrapping the target endpoint
      */
-    protected static Map<EndPoint, Map<EndPoint, List<Range>>>
-    filterRangesForTargetEndPoint(Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo, EndPoint targetEndPoint)
+    protected static Map<InetAddress, Map<InetAddress, List<Range>>>
+    filterRangesForTargetEndPoint(Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo, InetAddress targetEndPoint)
     {
-        Map<EndPoint, Map<EndPoint, List<Range>>> filteredMap = new HashMap<EndPoint, Map<EndPoint,List<Range>>>();
-        for (Map.Entry<EndPoint, Map<EndPoint, List<Range>>> e: rangeInfo.entrySet())
+        Map<InetAddress, Map<InetAddress, List<Range>>> filteredMap = new HashMap<InetAddress, Map<InetAddress,List<Range>>>();
+        for (Map.Entry<InetAddress, Map<InetAddress, List<Range>>> e: rangeInfo.entrySet())
         {
-            EndPoint source = e.getKey();
-            Map<EndPoint, List<Range>> targets = e.getValue();
-            Map<EndPoint, List<Range>> filteredTargets = new HashMap<EndPoint, List<Range>>();
+            InetAddress source = e.getKey();
+            Map<InetAddress, List<Range>> targets = e.getValue();
+            Map<InetAddress, List<Range>> filteredTargets = new HashMap<InetAddress, List<Range>>();
             if (targets.get(targetEndPoint) != null)
                 filteredTargets.put(targetEndPoint, targets.get(targetEndPoint));
             if (filteredTargets.size() > 0)
@@ -185,16 +185,16 @@
         return filteredMap;
     }
 
-    private static void sendMessagesToBootstrapSources(Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo) throws IOException
+    private static void sendMessagesToBootstrapSources(Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo) throws IOException
     {
-        Set<EndPoint> sources = rangeInfo.keySet();
-        for ( EndPoint source : sources )
+        Set<InetAddress> sources = rangeInfo.keySet();
+        for ( InetAddress source : sources )
         {
-            Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
-            Set<EndPoint> targets = targetRangesMap.keySet();
+            Map<InetAddress, List<Range>> targetRangesMap = rangeInfo.get(source);
+            Set<InetAddress> targets = targetRangesMap.keySet();
             List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
             
-            for ( EndPoint target : targets )
+            for ( InetAddress target : targets )
             {
                 List<Range> rangeForTarget = targetRangesMap.get(target);
                 BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
@@ -211,14 +211,14 @@
         }
     }
 
-    static Map<EndPoint, Map<EndPoint, List<Range>>> getWorkMap(
+    static Map<InetAddress, Map<InetAddress, List<Range>>> getWorkMap(
             Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget)
     {
         /*
          * Map whose key is the source node and the value is a map whose key is the
          * target and value is the list of ranges to be sent to it. 
         */
-        Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
+        Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo = new HashMap<InetAddress, Map<InetAddress, List<Range>>>();
         Set<Range> ranges = rangesWithSourceTarget.keySet();
         
         for ( Range range : ranges )
@@ -226,10 +226,10 @@
             List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
             for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
             {
-                Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
+                Map<InetAddress, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
                 if ( targetRangeMap == null )
                 {
-                    targetRangeMap = new HashMap<EndPoint, List<Range>>();
+                    targetRangeMap = new HashMap<InetAddress, List<Range>>();
                     rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
                 }
                 List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Wed Oct 21 18:26:02 2009
@@ -29,7 +29,7 @@
  import org.apache.log4j.Logger;
 
  import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
+ import java.net.InetAddress;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.LogUtil;
 
@@ -45,13 +45,13 @@
     private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);    
     
     /* endpoints that are to be moved. */
-    protected EndPoint[] targets_ = new EndPoint[0];
+    protected InetAddress[] targets_ = new InetAddress[0];
     /* position where they need to be moved */
     protected final Token[] tokens_;
     /* token metadata information */
     protected TokenMetadata tokenMetadata_ = null;
 
-    public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
+    public LeaveJoinProtocolImpl(InetAddress[] targets, Token[] tokens)
     {
         targets_ = targets;
         tokens_ = tokens;
@@ -65,16 +65,16 @@
             if (logger_.isDebugEnabled())
               logger_.debug("Beginning leave/join process for ...");                                                               
             /* copy the token to endpoint map */
-            Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
             /* copy the endpoint to token map */
-            Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+            Map<InetAddress, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
             
             Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
             Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
             if (logger_.isDebugEnabled())
               logger_.debug("Total number of old ranges " + oldRanges.length);
             /* Calculate the list of nodes that handle the old ranges */
-            Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
+            Map<Range, List<InetAddress>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
             
             /* Remove the tokens of the nodes leaving the ring */
             Set<Token> tokens = getTokensForLeavingNodes();
@@ -98,7 +98,7 @@
                 tokenToEndPointMap.put(tokens_[i], targets_[i]);
             }            
             /* Calculate the list of nodes that handle the new ranges */            
-            Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
+            Map<Range, List<InetAddress>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
             /* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
             removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
             /* Calculate ranges that need to be sent and from whom to where */
@@ -132,7 +132,7 @@
      * @param rangesAfterNodesJoin ranges after the nodes have joined at
      *        their respective position.
      */
-    private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
+    private void addSplitRangesToOldConfiguration(Map<Range, List<InetAddress>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
     {
         /* 
          * Find the ranges that are split. Maintain a mapping between
@@ -140,7 +140,7 @@
         */                
         Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
         /* Mapping of split ranges to the list of endpoints responsible for the range */                
-        Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();                                
+        Map<Range, List<InetAddress>> replicasForSplitRanges = new HashMap<Range, List<InetAddress>>();
         Set<Range> rangesSplit = splitRanges.keySet();                
         for ( Range splitRange : rangesSplit )
         {
@@ -156,11 +156,11 @@
         for ( Range splitRange : rangesSplit )
         {
             List<Range> subRanges = splitRanges.get(splitRange);
-            List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+            List<InetAddress> replicas = replicasForSplitRanges.get(splitRange);
             for ( Range subRange : subRanges )
             {
                 /* Make sure we clone or else we are hammered. */
-                oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+                oldRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
             }
         }
     }
@@ -175,10 +175,10 @@
      * @param expandedRangeToOldRangeMap mapping between the expanded ranges
      *        and the ranges whose aggregate is the expanded range.
      */
-    private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
+    private void removeExpandedRangesFromNewConfiguration(Map<Range, List<InetAddress>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
     {
         /* Get the replicas for the expanded ranges */
-        Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
+        Map<Range, List<InetAddress>> replicasForExpandedRanges = new HashMap<Range, List<InetAddress>>();
         Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
         for ( Range expandedRange : expandedRanges )
         {            
@@ -189,10 +189,10 @@
         for ( Range expandedRange : expandedRanges )
         {
             List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
-            List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);          
+            List<InetAddress> replicas = replicasForExpandedRanges.get(expandedRange);
             for ( Range subRange : subRanges )
             {
-                newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+                newRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
             }
         }        
     }
@@ -200,7 +200,7 @@
     private Set<Token> getTokensForLeavingNodes()
     {
         Set<Token> tokens = new HashSet<Token>();
-        for ( EndPoint target : targets_ )
+        for ( InetAddress target : targets_ )
         {
             tokens.add(tokenMetadata_.getToken(target));
         }        
@@ -277,16 +277,16 @@
     public static void main(String[] args) throws Throwable
     {
         StorageService ss = StorageService.instance();
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("3"), new EndPoint("A", 7000));
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("6"), new EndPoint("B", 7000));
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("9"), new EndPoint("C", 7000));
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("12"), new EndPoint("D", 7000));
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("15"), new EndPoint("E", 7000));
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("18"), new EndPoint("F", 7000));
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("21"), new EndPoint("G", 7000));
-        ss.updateTokenMetadataUnsafe(new BigIntegerToken("24"), new EndPoint("H", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("3"), InetAddress.getByName("A"));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("6"), InetAddress.getByName("B"));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("9"), InetAddress.getByName("C"));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("12"), InetAddress.getByName("D"));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("15"), InetAddress.getByName("E"));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("18"), InetAddress.getByName("F"));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("21"), InetAddress.getByName("G"));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("24"), InetAddress.getByName("H"));
         
-        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
+        Runnable runnable = new LeaveJoinProtocolImpl( new InetAddress[]{InetAddress.getByName("C"), InetAddress.getByName("D")}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
         runnable.run();
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.BoundedStatsDeque;
@@ -75,7 +75,7 @@
         return failureDetector_;
     }
     
-    private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
+    private Map<InetAddress, ArrivalWindow> arrivalSamples_ = new Hashtable<InetAddress, ArrivalWindow>();
     private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
     
     public FailureDetector()
@@ -117,7 +117,7 @@
      * 
      * @param ep for which the arrival window needs to be dumped.
      */
-    private void dumpInterArrivalTimes(EndPoint ep)
+    private void dumpInterArrivalTimes(InetAddress ep)
     {
         long now = System.currentTimeMillis();
         if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
@@ -135,26 +135,19 @@
         }
     }
     
-    public boolean isAlive(EndPoint ep)
+    public boolean isAlive(InetAddress ep)
     {
-        try
-        {
-            /* If the endpoint in question is the local endpoint return true. */
-            String localHost = FBUtilities.getHostAddress();
-            if ( localHost.equals( ep.getHost() ) )
-                    return true;
-        }
-        catch( UnknownHostException ex )
-        {
-            logger_.info( LogUtil.throwableToString(ex) );
-        }
+       /* If the endpoint in question is the local endpoint return true. */
+        InetAddress localHost = FBUtilities.getLocalAddress();
+        if (localHost.equals(ep))
+            return true;
+
     	/* Incoming port is assumed to be the Storage port. We need to change it to the control port */
-    	EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());        
-        EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
+        EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep);
         return epState.isAlive();
     }
     
-    public void report(EndPoint ep)
+    public void report(InetAddress ep)
     {
         if (logger_.isTraceEnabled())
             logger_.trace("reporting " + ep);
@@ -168,7 +161,7 @@
         heartbeatWindow.add(now);
     }
     
-    public void interpret(EndPoint ep)
+    public void interpret(InetAddress ep)
     {
         ArrivalWindow hbWnd = arrivalSamples_.get(ep);
         if ( hbWnd == null )
@@ -214,10 +207,10 @@
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-        Set<EndPoint> eps = arrivalSamples_.keySet();
+        Set<InetAddress> eps = arrivalSamples_.keySet();
         
         sb.append("-----------------------------------------------------------------------");
-        for ( EndPoint ep : eps )
+        for ( InetAddress ep : eps )
         {
             ArrivalWindow hWnd = arrivalSamples_.get(ep);
             sb.append(ep + " : ");

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java Wed Oct 21 18:26:02 2009
@@ -24,7 +24,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.*;
 
 /**
@@ -40,7 +40,7 @@
         serializer_ = new GossipDigestSerializer();
     }
     
-    EndPoint endPoint_;
+    InetAddress endPoint_;
     int generation_;
     int maxVersion_;
 
@@ -49,14 +49,14 @@
         return serializer_;
     }
     
-    GossipDigest(EndPoint endPoint, int generation, int maxVersion)
+    GossipDigest(InetAddress endPoint, int generation, int maxVersion)
     {
         endPoint_ = endPoint;
         generation_ = generation; 
         maxVersion_ = maxVersion;
     }
     
-    EndPoint getEndPoint()
+    InetAddress getEndPoint()
     {
         return endPoint_;
     }
@@ -101,7 +101,7 @@
 
     public GossipDigest deserialize(DataInputStream dis) throws IOException
     {
-        EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
+        InetAddress endPoint = CompactEndPointSerializationHelper.deserialize(dis);
         int generation = dis.readInt();
         int version = dis.readInt();
         return new GossipDigest(endPoint, generation, version);