You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/06/29 20:55:53 UTC

svn commit: r1141194 [1/2] - in /cassandra/trunk: conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/marshal/ src/java/org/apache/cassandra/db/migration/ src/java/org/apache/cassandra/dht/ src...

Author: brandonwilliams
Date: Wed Jun 29 18:55:50 2011
New Revision: 1141194

URL: http://svn.apache.org/viewvc?rev=1141194&view=rev
Log:
Add new config paramter, broadcast_address.
Patch by Vijay Parthasarathy, reviewed by brandonwilliams.

Modified:
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/Mx4jTool.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/NodeId.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/marshal/RoundTripTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/gms/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Jun 29 18:55:50 2011
@@ -183,6 +183,10 @@ storage_port: 7000
 # Setting this to 0.0.0.0 is always wrong.
 listen_address: localhost
 
+# Address to broadcast to other Cassandra nodes
+# Leaving this blank will set it to the same value as listen_address
+# broadcast_address: 1.2.3.4
+
 # The address to bind the Thrift RPC service to -- clients connect
 # here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if
 # you want Thrift to listen on all interfaces.

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Jun 29 18:55:50 2011
@@ -63,6 +63,7 @@ public class Config
     
     public Integer storage_port = 7000;
     public String listen_address;
+    public String broadcast_address;
     
     public String rpc_address;
     public Integer rpc_port = 9160;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jun 29 18:55:50 2011
@@ -59,6 +59,7 @@ public class DatabaseDescriptor
 
     private static IEndpointSnitch snitch;
     private static InetAddress listenAddress; // leave null so we can fall through to getLocalHost
+    private static InetAddress broadcastAddress;
     private static InetAddress rpcAddress;
     private static SeedProvider seedProvider;
     /* Current index into the above list of directories */
@@ -244,18 +245,31 @@ public class DatabaseDescriptor
             /* Local IP or hostname to bind services to */
             if (conf.listen_address != null)
             {
-                if (conf.listen_address.equals("0.0.0.0"))
+                try
+                {
+                    listenAddress = InetAddress.getByName(conf.listen_address);
+                }
+                catch (UnknownHostException e)
                 {
-                    throw new ConfigurationException("listen_address must be a single interface.  See http://wiki.apache.org/cassandra/FAQ#cant_listen_on_ip_any");
+                    throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
+                }
+            }
+
+            /* Gossip Address to broadcast */
+            if (conf.broadcast_address != null)
+            {
+                if (conf.broadcast_address.equals("0.0.0.0"))
+                {
+                    throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
                 }
                 
                 try
                 {
-                    listenAddress = InetAddress.getByName(conf.listen_address);
+                    broadcastAddress = InetAddress.getByName(conf.broadcast_address);
                 }
                 catch (UnknownHostException e)
                 {
-                    throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
+                    throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
                 }
             }
             
@@ -858,6 +872,16 @@ public class DatabaseDescriptor
         return listenAddress;
     }
     
+    public static InetAddress getBroadcastAddress()
+    {
+        return broadcastAddress;
+    }
+    
+    public static void setBroadcastAddress(InetAddress broadcastAdd)
+    {
+        broadcastAddress = broadcastAdd;
+    }
+    
     public static InetAddress getRpcAddress()
     {
         return rpcAddress;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Wed Jun 29 18:55:50 2011
@@ -222,7 +222,7 @@ public class CounterMutation implements 
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos, version);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
     }
 
     public boolean shouldReplicateOnWrite()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Wed Jun 29 18:55:50 2011
@@ -51,7 +51,7 @@ public class CounterMutationVerbHandler 
             if (logger.isDebugEnabled())
               logger.debug("Applying forwarded " + cm);
 
-            String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+            String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
             StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
             WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Jun 29 18:55:50 2011
@@ -282,7 +282,7 @@ public class HintedHandOffManager implem
         waited = 0;
         // then wait for the correct schema version.
         while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
-                gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
+                gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
         {
             Thread.sleep(1000);
             waited += 1000;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Wed Jun 29 18:55:50 2011
@@ -66,7 +66,7 @@ public class IndexScanCommand implements
         {
             throw new IOError(e);
         }
-        return new Message(FBUtilities.getLocalAddress(),
+        return new Message(FBUtilities.getBroadcastAddress(),
                            StorageService.Verb.INDEX_SCAN,
                            Arrays.copyOf(dob.getData(), dob.getLength()),
                            version);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Wed Jun 29 18:55:50 2011
@@ -91,7 +91,7 @@ public class RangeSliceCommand implement
     {
         DataOutputBuffer dob = new DataOutputBuffer();
         serializer.serialize(this, dob, version);
-        return new Message(FBUtilities.getLocalAddress(),
+        return new Message(FBUtilities.getBroadcastAddress(),
                            StorageService.Verb.RANGE_SLICE,
                            Arrays.copyOf(dob.getData(), dob.getLength()), version);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Wed Jun 29 18:55:50 2011
@@ -49,7 +49,7 @@ public class RangeSliceReply
             Row.serializer().serialize(row, dob, originalMessage.getVersion());
         }
         byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
-        return originalMessage.getReply(FBUtilities.getLocalAddress(), data, originalMessage.getVersion());
+        return originalMessage.getReply(FBUtilities.getBroadcastAddress(), data, originalMessage.getVersion());
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Jun 29 18:55:50 2011
@@ -53,7 +53,7 @@ public abstract class ReadCommand implem
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         ReadCommand.serializer().serialize(this, dos, version);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
     }
 
     public final QueryPath queryPath;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Jun 29 18:55:50 2011
@@ -76,7 +76,7 @@ public class ReadVerbHandler implements 
             byte[] bytes = new byte[readCtx.bufOut_.getLength()];
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
 
-            Message response = message.getReply(FBUtilities.getLocalAddress(), bytes, message.getVersion());
+            Message response = message.getReply(FBUtilities.getBroadcastAddress(), bytes, message.getVersion());
             if (logger_.isDebugEnabled())
               logger_.debug(String.format("Read key %s; sending response to %s@%s",
                                           ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Jun 29 18:55:50 2011
@@ -222,7 +222,7 @@ public class RowMutation implements IMut
 
     public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
     {
-        return new Message(FBUtilities.getLocalAddress(), verb, getSerializedBuffer(version), version);
+        return new Message(FBUtilities.getBroadcastAddress(), verb, getSerializedBuffer(version), version);
     }
 
     public synchronized byte[] getSerializedBuffer(int version) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Jun 29 18:55:50 2011
@@ -88,7 +88,7 @@ public class RowMutationVerbHandler impl
         // remove fwds from message to avoid infinite loop
         message.removeHeader(RowMutation.FORWARD_HEADER);
 
-        int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length;
+        int bytesPerInetAddress = FBUtilities.getBroadcastAddress().getAddress().length;
         assert forwardBytes.length >= bytesPerInetAddress;
         assert forwardBytes.length % bytesPerInetAddress == 0;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Jun 29 18:55:50 2011
@@ -407,7 +407,7 @@ public class SystemTable
     public static void writeCurrentLocalNodeId(NodeId oldNodeId, NodeId newNodeId)
     {
         long now = System.currentTimeMillis();
-        ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getLocalAddress().getAddress());
+        ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
 
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, NODE_ID_CF);
         cf.addColumn(new Column(newNodeId.bytes(), ip, now));

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java Wed Jun 29 18:55:50 2011
@@ -54,7 +54,7 @@ public class TruncateResponse
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         TruncateResponse.serializer().serialize(truncateResponseMessage, dos, original.getVersion());
-        return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
+        return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion());
     }
 
     public TruncateResponse(String keyspace, String columnFamily, boolean success) {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Wed Jun 29 18:55:50 2011
@@ -72,7 +72,7 @@ public class Truncation implements Messa
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos, version);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
     }
 
     public String toString()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Wed Jun 29 18:55:50 2011
@@ -49,7 +49,7 @@ public class WriteResponse 
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
-        return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
+        return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion());
     }
 
 	private final String table_;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java Wed Jun 29 18:55:50 2011
@@ -156,7 +156,7 @@ public class TimeUUIDType extends Abstra
         }
         else if (source.toLowerCase().equals("now"))
         {
-            idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress())));
+            idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress())));
         }
         // Milliseconds since epoch?
         else if (source.matches("^\\d+$"))

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java Wed Jun 29 18:55:50 2011
@@ -227,7 +227,7 @@ public class UUIDType extends AbstractUU
         }
         else if (source.toLowerCase().equals("now"))
         {
-            idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress())));
+            idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress())));
         }
         // Milliseconds since epoch?
         else if (source.matches("^\\d+$"))

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -40,7 +40,7 @@ public class AddColumnFamily extends Mig
     
     public AddColumnFamily(CFMetaData cfm) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         this.cfm = cfm;
         KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cfm.ksName);
         

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java Wed Jun 29 18:55:50 2011
@@ -38,7 +38,7 @@ public class AddKeyspace extends Migrati
     
     public AddKeyspace(KSMetaData ksm) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         
         if (DatabaseDescriptor.getTableDefinition(ksm.name) != null)
             throw new ConfigurationException("Keyspace already exists.");

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -44,7 +44,7 @@ public class DropColumnFamily extends Mi
     
     public DropColumnFamily(String tableName, String cfName) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         this.tableName = tableName;
         this.cfName = cfName;
         

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Wed Jun 29 18:55:50 2011
@@ -41,7 +41,7 @@ public class DropKeyspace extends Migrat
     
     public DropKeyspace(String name) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         this.name = name;
         KSMetaData ksm = DatabaseDescriptor.getTableDefinition(name);
         if (ksm == null)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -47,7 +47,7 @@ public class RenameColumnFamily extends 
     // be called during deserialization of this migration.
     public RenameColumnFamily(String tableName, String oldName, String newName) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         this.tableName = tableName;
         this.oldName = oldName;
         this.newName = newName;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java Wed Jun 29 18:55:50 2011
@@ -46,7 +46,7 @@ public class RenameKeyspace extends Migr
     
     public RenameKeyspace(String oldName, String newName) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         this.oldName = oldName;
         this.newName = newName;
         

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -39,7 +39,7 @@ public class UpdateColumnFamily extends 
     /** assumes validation has already happened. That is, replacing oldCfm with newCfm is neither illegal or totally whackass. */
     public UpdateColumnFamily(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         
         KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cf_def.keyspace.toString());
         if (ksm == null)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java Wed Jun 29 18:55:50 2011
@@ -40,7 +40,7 @@ public class UpdateKeyspace extends Migr
     /** create migration based on thrift parameters */
     public UpdateKeyspace(KSMetaData ksm) throws ConfigurationException, IOException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
         
         assert ksm != null;
         assert ksm.cfMetaData() != null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Jun 29 18:55:50 2011
@@ -184,7 +184,7 @@ public class BootStrapper
         });
 
         InetAddress maxEndpoint = endpoints.get(endpoints.size() - 1);
-        assert !maxEndpoint.equals(FBUtilities.getLocalAddress());
+        assert !maxEndpoint.equals(FBUtilities.getBroadcastAddress());
         if (metadata.pendingRangeChanges(maxEndpoint) > 0)
             throw new RuntimeException("Every node is a bootstrap source! Please specify an initial token manually or wait for an existing bootstrap operation to finish.");
         
@@ -218,7 +218,7 @@ public class BootStrapper
 
     static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
     {
-        Message message = new Message(FBUtilities.getLocalAddress(), 
+        Message message = new Message(FBUtilities.getBroadcastAddress(),
                                       StorageService.Verb.BOOTSTRAP_TOKEN, 
                                       ArrayUtils.EMPTY_BYTE_ARRAY, 
                                       Gossiper.instance.getVersion(maxEndpoint));

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed Jun 29 18:55:50 2011
@@ -110,7 +110,7 @@ public class FailureDetector implements 
     
     public boolean isAlive(InetAddress ep)
     {
-        if (ep.equals(FBUtilities.getLocalAddress()))
+        if (ep.equals(FBUtilities.getBroadcastAddress()))
             return true;
 
         EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep);

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jun 29 18:55:50 2011
@@ -108,9 +108,9 @@ public class Gossiper implements IFailur
                 MessagingService.instance().waitUntilListening();
                 
                 /* Update the local heartbeat counter. */
-                endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().updateHeartBeat();
+                endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
                 if (logger.isTraceEnabled())
-                    logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().getHeartBeatVersion());
+                    logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
                 final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
                 Gossiper.instance.makeRandomGossipDigest(gDigests);
 
@@ -210,8 +210,8 @@ public class Gossiper implements IFailur
     public Set<InetAddress> getLiveMembers()
     {
         Set<InetAddress> liveMbrs = new HashSet<InetAddress>(liveEndpoints);
-        if (!liveMbrs.contains(FBUtilities.getLocalAddress()))
-            liveMbrs.add(FBUtilities.getLocalAddress());
+        if (!liveMbrs.contains(FBUtilities.getBroadcastAddress()))
+            liveMbrs.add(FBUtilities.getBroadcastAddress());
         return liveMbrs;
     }
 
@@ -338,7 +338,7 @@ public class Gossiper implements IFailur
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos, version);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
     }
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage, int version) throws IOException
@@ -346,7 +346,7 @@ public class Gossiper implements IFailur
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos, version);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
     }
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message, int version) throws IOException
@@ -354,7 +354,7 @@ public class Gossiper implements IFailur
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
     }
 
     /**
@@ -414,7 +414,7 @@ public class Gossiper implements IFailur
         int size = seeds.size();
         if ( size > 0 )
         {
-            if ( size == 1 && seeds.contains(FBUtilities.getLocalAddress()) )
+            if ( size == 1 && seeds.contains(FBUtilities.getBroadcastAddress()) )
             {
                 return;
             }
@@ -441,7 +441,7 @@ public class Gossiper implements IFailur
         Set<InetAddress> eps = endpointStateMap.keySet();
         for ( InetAddress endpoint : eps )
         {
-            if ( endpoint.equals(FBUtilities.getLocalAddress()) )
+            if ( endpoint.equals(FBUtilities.getBroadcastAddress()) )
                 continue;
 
             FailureDetector.instance.interpret(endpoint);
@@ -655,7 +655,7 @@ public class Gossiper implements IFailur
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
         {
             InetAddress ep = entry.getKey();
-            if ( ep.equals(FBUtilities.getLocalAddress()))
+            if ( ep.equals(FBUtilities.getBroadcastAddress()))
                 continue;
             if (justRemovedEndpoints.containsKey(ep))
             {
@@ -832,14 +832,14 @@ public class Gossiper implements IFailur
         Set<InetAddress> seedHosts = DatabaseDescriptor.getSeeds();
         for (InetAddress seed : seedHosts)
         {
-            if (seed.equals(FBUtilities.getLocalAddress()))
+            if (seed.equals(FBUtilities.getBroadcastAddress()))
                 continue;
             seeds.add(seed);
         }
 
         /* initialize the heartbeat state for this localEndpoint */
         maybeInitializeLocalState(generationNbr);
-        EndpointState localState = endpointStateMap.get(FBUtilities.getLocalAddress());
+        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
 
         //notify snitches that Gossiper is about to start
         DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@ -855,13 +855,13 @@ public class Gossiper implements IFailur
     // initialize local HB state if needed.
     public void maybeInitializeLocalState(int generationNbr) 
     {
-        EndpointState localState = endpointStateMap.get(FBUtilities.getLocalAddress());
+        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
         if ( localState == null )
         {
             HeartBeatState hbState = new HeartBeatState(generationNbr);
             localState = new EndpointState(hbState);
             localState.markAlive();
-            endpointStateMap.put(FBUtilities.getLocalAddress(), localState);
+            endpointStateMap.put(FBUtilities.getBroadcastAddress(), localState);
         }
     }
     
@@ -882,7 +882,7 @@ public class Gossiper implements IFailur
 
     public void addLocalApplicationState(ApplicationState state, VersionedValue value)
     {
-        EndpointState epState = endpointStateMap.get(FBUtilities.getLocalAddress());
+        EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
         assert epState != null;
         epState.addApplicationState(state, value);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Jun 29 18:55:50 2011
@@ -170,7 +170,7 @@ public abstract class AbstractReplicatio
         //
         // we do a 2nd pass on targets instead of using temporary storage,
         // to optimize for the common case (everything was alive).
-        InetAddress localAddress = FBUtilities.getLocalAddress();
+        InetAddress localAddress = FBUtilities.getBroadcastAddress();
         for (InetAddress ep : targets)
         {
             if (map.containsKey(ep))

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Wed Jun 29 18:55:50 2011
@@ -131,7 +131,7 @@ public class DynamicEndpointSnitch exten
 
     public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
     {
-        assert address.equals(FBUtilities.getLocalAddress()); // we only know about ourself
+        assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself
         if (BADNESS_THRESHOLD == 0)
         {
             sortByProximityWithScore(address, addresses);

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java Wed Jun 29 18:55:50 2011
@@ -81,14 +81,14 @@ public class Ec2Snitch extends AbstractN
 
     public String getRack(InetAddress endpoint)
     {
-        if (endpoint.equals(FBUtilities.getLocalAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
             return ec2zone;
         return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value;
     }
 
     public String getDatacenter(InetAddress endpoint)
     {
-        if (endpoint.equals(FBUtilities.getLocalAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
             return ec2region;
         return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java Wed Jun 29 18:55:50 2011
@@ -37,7 +37,7 @@ public class LocalStrategy extends Abstr
 
     public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
     {
-        return Arrays.asList(FBUtilities.getLocalAddress());
+        return Arrays.asList(FBUtilities.getBroadcastAddress());
     }
 
     public int getReplicationFactor()

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jun 29 18:55:50 2011
@@ -66,93 +66,94 @@ public class IncomingTcpConnection exten
             MessagingService.validateMagic(input.readInt());
             int header = input.readInt();
             isStream = MessagingService.getBits(header, 3, 1) == 1;
-            if (!isStream)
-                // we should buffer
-                input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
             version = MessagingService.getBits(header, 15, 8);
-            if (logger.isDebugEnabled())
-                logger.debug("Version for " + socket.getInetAddress() + " is " + version);
-        }
-        catch (IOException e)
-        {
-            close();
-            throw new IOError(e);
-        }
-
-        if (version > MessagingService.version_)
-        {
-            // save the endpoint so gossip will reconnect to it
-            Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
-            logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
-
-            // streaming connections are per-session and have a fixed version.  we can't do anything with a new-version
-            // stream connection, so drop it.
             if (isStream)
             {
-                close();
-                return;
-            }
-            // for non-streaming connections, continue to read the messages (and ignore them) until sender
-            // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
-        }
-        else
-        {
-            // only set version when <= to us, otherwise it's the responsibility of the other end to mimic us
-            Gossiper.instance.setVersion(socket.getInetAddress(), version);
-        }
-
-        while (true)
-        {
-            try
-            {
-                if (isStream)
+                if (version == MessagingService.version_)
                 {
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
                     stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)), version), input);
-                    break;
-                }
+                } 
                 else
                 {
-                    int size = input.readInt();
-                    byte[] contentBytes = new byte[size];
-                    // readFully allocates a direct buffer the size of the chunk it is asked to read,
-                    // so we cap that at CHUNK_SIZE.  See https://issues.apache.org/jira/browse/CASSANDRA-2654
-                    int remainder = size % CHUNK_SIZE;
-                    for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE)
-                        input.readFully(contentBytes, offset, CHUNK_SIZE);
-                    input.readFully(contentBytes, size - remainder, remainder);
-
-                    if (version <= MessagingService.version_)
-                    {
-                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
-                        String id = dis.readUTF();
-                        Message message = Message.serializer().deserialize(dis, version);
-                        MessagingService.instance().receive(message, id);
-                    }
+                    // streaming connections are per-session and have a fixed version.  we can't do anything with a new-version stream connection, so drop it.
+                    logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
                 }
-                // prepare to read the next message
-                MessagingService.validateMagic(input.readInt());
-                int header = input.readInt();
-                assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
-                version = MessagingService.getBits(header, 15, 8);
+                // We are done with this connection....
+                return;
             }
-            catch (EOFException e)
+            
+            // we should buffer
+            input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
+            // Receive the first message to set the version.
+            Message msg = receiveMessage(input, version);
+            if (version > MessagingService.version_)
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("eof reading from socket; closing", e);
-                break;
+                // save the endpoint so gossip will reconnect to it
+                Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
+                logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
             }
-            catch (IOException e) 
+            else if (msg != null)
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("error reading from socket; closing", e);
-                break;
+                Gossiper.instance.setVersion(msg.getFrom(), version);
             }
+            
+            // loop to get the next message.
+            while (true)
+            {
+                // prepare to read the next message
+                MessagingService.validateMagic(input.readInt());
+                header = input.readInt();
+                assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
+                version = MessagingService.getBits(header, 15, 8);
+                receiveMessage(input, version);
+            }
+        } 
+        catch (EOFException e)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("eof reading from socket; closing", e);
+            // connection will be reset so no need to throw an exception.
+        } 
+        catch (IOException e)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("IOError reading from socket; closing", e);
+            // throw to be logged else where.
+            throw new IOError(e);
+        } 
+        finally
+        {
+            // cleanup.
+            close();
         }
+    }
 
-        close();
+    private Message receiveMessage(DataInputStream input, int version) throws IOException
+    {
+        int size = input.readInt();
+        byte[] contentBytes = new byte[size];
+        // readFully allocates a direct buffer the size of the chunk it is asked to read,
+        // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654
+        int remainder = size % CHUNK_SIZE;
+        for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE)
+            input.readFully(contentBytes, offset, CHUNK_SIZE);
+        input.readFully(contentBytes, size - remainder, remainder);
+
+        // for non-streaming connections, continue to read the messages (and ignore them) until sender
+        // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+        if (version <= MessagingService.version_)
+        {
+            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
+            String id = dis.readUTF();
+            Message message = Message.serializer().deserialize(dis, version);
+            MessagingService.instance().receive(message, id);
+            return message;
+        }
+        logger.info("Received connection from newer protocol version. Ignorning message.");
+        return null;
     }
 
     private void close()

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Jun 29 18:55:50 2011
@@ -111,7 +111,7 @@ public class Message
 
     public Message getInternalReply(byte[] body, int version)
     {
-        Header header = new Header(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+        Header header = new Header(FBUtilities.getBroadcastAddress(), StorageService.Verb.INTERNAL_RESPONSE);
         return new Message(header, body, version);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jun 29 18:55:50 2011
@@ -341,7 +341,7 @@ public final class MessagingService impl
     private void sendOneWay(Message message, String id, InetAddress to)
     {
         if (logger_.isTraceEnabled())
-            logger_.trace(FBUtilities.getLocalAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
+            logger_.trace(FBUtilities.getBroadcastAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
 
         // do local deliveries
         if ( message.getFrom().equals(to) )

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java Wed Jun 29 18:55:50 2011
@@ -43,7 +43,7 @@ public abstract class AbstractRowResolve
 {
     protected static Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
 
-    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
+    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
 
     protected final String table;
     protected final ConcurrentMap<Message, ReadResponse> replies = new NonBlockingHashMap<Message, ReadResponse>();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Jun 29 18:55:50 2011
@@ -167,7 +167,7 @@ public class AntiEntropyService
         if (!replicaSets.containsKey(range))
             return Collections.emptySet();
         Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(range));
-        neighbors.remove(FBUtilities.getLocalAddress());
+        neighbors.remove(FBUtilities.getBroadcastAddress());
         // Excluding all node with version <= 0.7 since they don't know how to
         // create a correct merkle tree (they build it over the full range)
         Iterator<InetAddress> iter = neighbors.iterator();
@@ -188,7 +188,7 @@ public class AntiEntropyService
      */
     private void rendezvous(TreeRequest request, MerkleTree tree)
     {
-        InetAddress LOCAL = FBUtilities.getLocalAddress();
+        InetAddress LOCAL = FBUtilities.getBroadcastAddress();
 
         // the rendezvous pairs for this session
         Map<TreeRequest, TreePair> ctrees = rendezvousPairs(request.sessionid);
@@ -416,7 +416,7 @@ public class AntiEntropyService
         public void run()
         {
             // respond to the request that triggered this validation
-            AntiEntropyService.instance.respond(this, FBUtilities.getLocalAddress());
+            AntiEntropyService.instance.respond(this, FBUtilities.getBroadcastAddress());
         }
     }
 
@@ -443,7 +443,7 @@ public class AntiEntropyService
          */
         public void run()
         {
-            InetAddress local = FBUtilities.getLocalAddress();
+            InetAddress local = FBUtilities.getBroadcastAddress();
 
             // restore partitioners (in case we were serialized)
             if (ltree.partitioner() == null)
@@ -540,7 +540,7 @@ public class AntiEntropyService
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(request, dos, version);
-                return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
+                return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
             }
             catch(IOException e)
             {
@@ -802,7 +802,7 @@ public class AntiEntropyService
                     for (InetAddress endpoint : endpoints)
                         requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname), this);
                     // send but don't record an outstanding request to the local node
-                    AntiEntropyService.instance.request(getName(), FBUtilities.getLocalAddress(), range, tablename, cfname);
+                    AntiEntropyService.instance.request(getName(), FBUtilities.getBroadcastAddress(), range, tablename, cfname);
                 }
                 logger.info("Waiting for repair requests: " + requests.keySet());
                 requestsMade.signalAll();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Wed Jun 29 18:55:50 2011
@@ -40,7 +40,7 @@ import org.apache.cassandra.utils.FBUtil
 public class DatacenterReadCallback<T> extends ReadCallback<T>
 {
     private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-    private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+    private static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 
     public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Wed Jun 29 18:55:50 2011
@@ -51,7 +51,7 @@ public class DatacenterSyncWriteResponse
     private static final String localdc;
     static
     {
-        localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+        localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
     }
 
 	private final NetworkTopologyStrategy strategy;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Wed Jun 29 18:55:50 2011
@@ -48,7 +48,7 @@ public class DatacenterWriteResponseHand
     private static final String localdc;
     static
     {
-        localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+        localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
     }
 
     protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Wed Jun 29 18:55:50 2011
@@ -212,7 +212,7 @@ public class MigrationManager implements
         }
         dout.close();
         byte[] body = bout.toByteArray();
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE, body, version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.DEFINITIONS_UPDATE, body, version);
     }
     
     // other half of this transformation is in MigrationManager.

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Jun 29 18:55:50 2011
@@ -222,7 +222,7 @@ public class StorageLoadBalancer impleme
 
     private double localLoad()
     {
-        Double load = loadInfo2_.get(FBUtilities.getLocalAddress());
+        Double load = loadInfo2_.get(FBUtilities.getBroadcastAddress());
         return load == null ? 0 : load;
     }
 
@@ -277,11 +277,11 @@ public class StorageLoadBalancer impleme
 
     private boolean isANeighbour(InetAddress neighbour)
     {
-        InetAddress predecessor = StorageService.instance.getPredecessor(FBUtilities.getLocalAddress());
+        InetAddress predecessor = StorageService.instance.getPredecessor(FBUtilities.getBroadcastAddress());
         if ( predecessor.equals(neighbour) )
             return true;
 
-        InetAddress successor = StorageService.instance.getSuccessor(FBUtilities.getLocalAddress());
+        InetAddress successor = StorageService.instance.getSuccessor(FBUtilities.getBroadcastAddress());
         if ( successor.equals(neighbour) )
             return true;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jun 29 18:55:50 2011
@@ -134,7 +134,7 @@ public class StorageProxy implements Sto
     */
     public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
     {
-        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
         long startTime = System.nanoTime();
         List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
@@ -235,7 +235,7 @@ public class StorageProxy implements Sto
             if (targets.size() == 1 && targets.iterator().next().equals(destination))
             {
                 // unhinted writes
-                if (destination.equals(FBUtilities.getLocalAddress()))
+                if (destination.equals(FBUtilities.getBroadcastAddress()))
                 {
                     if (insertLocalMessages)
                         insertLocal(rm, responseHandler);
@@ -378,7 +378,7 @@ public class StorageProxy implements Sto
     {
         InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key());
 
-        if (endpoint.equals(FBUtilities.getLocalAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
         {
             return applyCounterMutationOnCoordinator(cm, localDataCenter);
         }
@@ -405,7 +405,7 @@ public class StorageProxy implements Sto
     private static InetAddress findSuitableEndpoint(String table, ByteBuffer key) throws UnavailableException
     {
         List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
-        DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+        DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);
         if (endpoints.isEmpty())
             throw new UnavailableException();
         return endpoints.get(0);
@@ -509,7 +509,7 @@ public class StorageProxy implements Sto
             logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);
 
             RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
             ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
@@ -526,7 +526,7 @@ public class StorageProxy implements Sto
             }
 
             InetAddress dataPoint = handler.endpoints.get(0);
-            if (dataPoint.equals(FBUtilities.getLocalAddress()))
+            if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
             {
                 if (logger.isDebugEnabled())
                     logger.debug("reading data locally");
@@ -544,7 +544,7 @@ public class StorageProxy implements Sto
             MessageProducer producer = new CachingMessageProducer(digestCommand);
             for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
             {
-                if (digestPoint.equals(FBUtilities.getLocalAddress()))
+                if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("reading digest locally");
@@ -639,7 +639,7 @@ public class StorageProxy implements Sto
 
             Table table = Table.open(command.table);
             ReadResponse result = ReadVerbHandler.getResponse(command, command.getRow(table));
-            MessagingService.instance().addLatency(FBUtilities.getLocalAddress(), System.currentTimeMillis() - start);
+            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
             handler.response(result);
         }
     }
@@ -672,9 +672,9 @@ public class StorageProxy implements Sto
             for (AbstractBounds range : ranges)
             {
                 List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
-                DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
+                DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
 
-                if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getLocalAddress()))
+                if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("local range slice");
@@ -773,7 +773,7 @@ public class StorageProxy implements Sto
         // an empty message acts as a request to the SchemaCheckVerbHandler.
         for (InetAddress endpoint : liveHosts)
         {
-            Message message = new Message(FBUtilities.getLocalAddress(), 
+            Message message = new Message(FBUtilities.getBroadcastAddress(),
                                           StorageService.Verb.SCHEMA_CHECK, 
                                           ArrayUtils.EMPTY_BYTE_ARRAY, 
                                           Gossiper.instance.getVersion(endpoint));
@@ -953,7 +953,7 @@ public class StorageProxy implements Sto
         for (AbstractBounds range : ranges)
         {
             List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, range.right);
-            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
+            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
 
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jun 29 18:55:50 2011
@@ -176,12 +176,12 @@ public class StorageService implements I
 
     public Collection<Range> getLocalRanges(String table)
     {
-        return getRangesForEndpoint(table, FBUtilities.getLocalAddress());
+        return getRangesForEndpoint(table, FBUtilities.getBroadcastAddress());
     }
 
     public Range getLocalPrimaryRange()
     {
-        return getPrimaryRangeForEndpoint(FBUtilities.getLocalAddress());
+        return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress());
     }
 
     private Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
@@ -217,7 +217,7 @@ public class StorageService implements I
         if (logger_.isDebugEnabled())
             logger_.debug("Setting token to {}", token);
         SystemTable.updateToken(token);
-        tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
+        tokenMetadata_.updateNormalToken(token, FBUtilities.getBroadcastAddress());
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(getLocalToken()));
         setMode("Normal", false);
     }
@@ -431,7 +431,6 @@ public class StorageService implements I
         Gossiper.instance.register(this);
         Gossiper.instance.register(migrationManager);
         Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
-
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
         StorageLoadBalancer.instance.startBroadcasting();
         MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
@@ -440,19 +439,19 @@ public class StorageService implements I
         HintedHandOffManager.instance.registerMBean();
 
         if (DatabaseDescriptor.isAutoBootstrap()
-                && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
+                && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())
                 && !SystemTable.isBootstrapped())
             logger_.info("This node will not auto bootstrap because it is configured to be a seed node.");
 
         Token token;
         if (DatabaseDescriptor.isAutoBootstrap()
-            && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped()))
+            && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()) || SystemTable.isBootstrapped()))
         {
             setMode("Joining: getting load and schema information", true);
             StorageLoadBalancer.instance.waitForLoadInfo();
             if (logger_.isDebugEnabled())
                 logger_.debug("... got load + schema info");
-            if (tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
+            if (tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
             {
                 String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
                 throw new UnsupportedOperationException(s);
@@ -542,7 +541,7 @@ public class StorageService implements I
             throw new AssertionError(e);
         }
         setMode("Bootstrapping", true);
-        new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).bootstrap(); // handles token update
+        new BootStrapper(FBUtilities.getBroadcastAddress(), token, tokenMetadata_).bootstrap(); // handles token update
     }
 
     public boolean isBootstrapMode()
@@ -873,7 +872,7 @@ public class StorageService implements I
         if (removeEndpoint == null)
             return;
         
-        if (removeEndpoint.equals(FBUtilities.getLocalAddress()))
+        if (removeEndpoint.equals(FBUtilities.getBroadcastAddress()))
         {
             logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
             return;
@@ -1026,7 +1025,7 @@ public class StorageService implements I
      */
     private Multimap<InetAddress, Range> getNewSourceRanges(String table, Set<Range> ranges) 
     {
-        InetAddress myAddress = FBUtilities.getLocalAddress();
+        InetAddress myAddress = FBUtilities.getBroadcastAddress();
         Multimap<Range, InetAddress> rangeAddresses = Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata_);
         Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
         IFailureDetector failureDetector = FailureDetector.instance;
@@ -1093,7 +1092,7 @@ public class StorageService implements I
         final Multimap<InetAddress, String> fetchSources = HashMultimap.create();
         Multimap<String, Map.Entry<InetAddress, Collection<Range>>> rangesToFetch = HashMultimap.create();
 
-        final InetAddress myAddress = FBUtilities.getLocalAddress();
+        final InetAddress myAddress = FBUtilities.getBroadcastAddress();
 
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
@@ -1232,7 +1231,7 @@ public class StorageService implements I
             map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
         }
         // gossiper doesn't see its own updates, so we need to special-case the local node
-        map.put(FBUtilities.getLocalAddress().getHostAddress(), getLoadString());
+        map.put(FBUtilities.getBroadcastAddress().getHostAddress(), getLoadString());
         return map;
     }
 
@@ -1313,7 +1312,7 @@ public class StorageService implements I
 
     public int getCurrentGenerationNumber()
     {
-        return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getLocalAddress());
+        return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddress());
     }
 
     public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
@@ -1717,19 +1716,19 @@ public class StorageService implements I
     private void startLeaving()
     {
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalToken()));
-        tokenMetadata_.addLeavingEndpoint(FBUtilities.getLocalAddress());
+        tokenMetadata_.addLeavingEndpoint(FBUtilities.getBroadcastAddress());
         calculatePendingRanges();
     }
 
     public void decommission() throws InterruptedException
     {
-        if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
+        if (!tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
             throw new UnsupportedOperationException("local node is not a member of the token ring yet");
         if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2)
             throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            if (tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() > 0)
+            if (tokenMetadata_.getPendingRanges(table, FBUtilities.getBroadcastAddress()).size() > 0)
                 throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
         }
 
@@ -1756,7 +1755,7 @@ public class StorageService implements I
     private void leaveRing()
     {
         SystemTable.setBootstrapped(false);
-        tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
+        tokenMetadata_.removeEndpoint(FBUtilities.getBroadcastAddress());
         calculatePendingRanges();
 
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken()));
@@ -1777,7 +1776,7 @@ public class StorageService implements I
 
         for (final String table : DatabaseDescriptor.getNonSystemTables())
         {
-            Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(table, FBUtilities.getLocalAddress());
+            Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(table, FBUtilities.getBroadcastAddress());
 
             if (logger_.isDebugEnabled())
                 logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
@@ -1825,7 +1824,7 @@ public class StorageService implements I
             throw new IOException("target token " + newToken + " is already owned by another node.");
 
         // address of the current node
-        InetAddress localAddress = FBUtilities.getLocalAddress();
+        InetAddress localAddress = FBUtilities.getBroadcastAddress();
         List<String> tablesToProcess = DatabaseDescriptor.getNonSystemTables();
 
         // checking if data is moving to this node
@@ -1988,7 +1987,7 @@ public class StorageService implements I
      */
     public void removeToken(String tokenString)
     {
-        InetAddress myAddress = FBUtilities.getLocalAddress();
+        InetAddress myAddress = FBUtilities.getBroadcastAddress();
         Token localToken = tokenMetadata_.getToken(myAddress);
         Token token = partitioner.getTokenFactory().fromString(tokenString);
         InetAddress endpoint = tokenMetadata_.getEndpoint(token);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Wed Jun 29 18:55:50 2011
@@ -67,7 +67,7 @@ public class StreamIn
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
         StreamInSession session = StreamInSession.create(source, callback);
-        StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getLocalAddress(),
+        StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getBroadcastAddress(),
                                                             ranges,
                                                             tableName,
                                                             columnFamilies,

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Wed Jun 29 18:55:50 2011
@@ -59,7 +59,7 @@ class StreamReply implements MessageProd
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         serializer.serialize(this, dos, version);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Wed Jun 29 18:55:50 2011
@@ -106,7 +106,7 @@ class StreamRequestMessage implements Me
         {
             throw new IOError(e);
         }
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray(), version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray(), version);
     }
 
     public String toString()

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Jun 29 18:55:50 2011
@@ -760,7 +760,7 @@ public class CassandraServer implements 
             InetAddress address = InetAddress.getByName(endpoint);
             boolean endpointValid = null != Gossiper.instance.getEndpointStateForEndpoint(address);
             String datacenter = DatabaseDescriptor
-                    .getEndpointSnitch().getDatacenter(endpointValid ? address : FBUtilities.getLocalAddress());
+                    .getEndpointSnitch().getDatacenter(endpointValid ? address : FBUtilities.getBroadcastAddress());
             List<InetAddress> addresses = new ArrayList<InetAddress>();
             for(String ep : endpoints)
             {