You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/06/29 20:55:53 UTC
svn commit: r1141194 [1/2] - in /cassandra/trunk: conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/marshal/
src/java/org/apache/cassandra/db/migration/
src/java/org/apache/cassandra/dht/ src...
Author: brandonwilliams
Date: Wed Jun 29 18:55:50 2011
New Revision: 1141194
URL: http://svn.apache.org/viewvc?rev=1141194&view=rev
Log:
Add new config paramter, broadcast_address.
Patch by Vijay Parthasarathy, reviewed by brandonwilliams.
Modified:
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
cassandra/trunk/src/java/org/apache/cassandra/utils/Mx4jTool.java
cassandra/trunk/src/java/org/apache/cassandra/utils/NodeId.java
cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/marshal/RoundTripTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
cassandra/trunk/test/unit/org/apache/cassandra/gms/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Jun 29 18:55:50 2011
@@ -183,6 +183,10 @@ storage_port: 7000
# Setting this to 0.0.0.0 is always wrong.
listen_address: localhost
+# Address to broadcast to other Cassandra nodes
+# Leaving this blank will set it to the same value as listen_address
+# broadcast_address: 1.2.3.4
+
# The address to bind the Thrift RPC service to -- clients connect
# here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if
# you want Thrift to listen on all interfaces.
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Jun 29 18:55:50 2011
@@ -63,6 +63,7 @@ public class Config
public Integer storage_port = 7000;
public String listen_address;
+ public String broadcast_address;
public String rpc_address;
public Integer rpc_port = 9160;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jun 29 18:55:50 2011
@@ -59,6 +59,7 @@ public class DatabaseDescriptor
private static IEndpointSnitch snitch;
private static InetAddress listenAddress; // leave null so we can fall through to getLocalHost
+ private static InetAddress broadcastAddress;
private static InetAddress rpcAddress;
private static SeedProvider seedProvider;
/* Current index into the above list of directories */
@@ -244,18 +245,31 @@ public class DatabaseDescriptor
/* Local IP or hostname to bind services to */
if (conf.listen_address != null)
{
- if (conf.listen_address.equals("0.0.0.0"))
+ try
+ {
+ listenAddress = InetAddress.getByName(conf.listen_address);
+ }
+ catch (UnknownHostException e)
{
- throw new ConfigurationException("listen_address must be a single interface. See http://wiki.apache.org/cassandra/FAQ#cant_listen_on_ip_any");
+ throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
+ }
+ }
+
+ /* Gossip Address to broadcast */
+ if (conf.broadcast_address != null)
+ {
+ if (conf.broadcast_address.equals("0.0.0.0"))
+ {
+ throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
}
try
{
- listenAddress = InetAddress.getByName(conf.listen_address);
+ broadcastAddress = InetAddress.getByName(conf.broadcast_address);
}
catch (UnknownHostException e)
{
- throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
+ throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
}
}
@@ -858,6 +872,16 @@ public class DatabaseDescriptor
return listenAddress;
}
+ public static InetAddress getBroadcastAddress()
+ {
+ return broadcastAddress;
+ }
+
+ public static void setBroadcastAddress(InetAddress broadcastAdd)
+ {
+ broadcastAddress = broadcastAdd;
+ }
+
public static InetAddress getRpcAddress()
{
return rpcAddress;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Wed Jun 29 18:55:50 2011
@@ -222,7 +222,7 @@ public class CounterMutation implements
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
}
public boolean shouldReplicateOnWrite()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Wed Jun 29 18:55:50 2011
@@ -51,7 +51,7 @@ public class CounterMutationVerbHandler
if (logger.isDebugEnabled())
logger.debug("Applying forwarded " + cm);
- String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Jun 29 18:55:50 2011
@@ -282,7 +282,7 @@ public class HintedHandOffManager implem
waited = 0;
// then wait for the correct schema version.
while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
- gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
+ gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
Thread.sleep(1000);
waited += 1000;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Wed Jun 29 18:55:50 2011
@@ -66,7 +66,7 @@ public class IndexScanCommand implements
{
throw new IOError(e);
}
- return new Message(FBUtilities.getLocalAddress(),
+ return new Message(FBUtilities.getBroadcastAddress(),
StorageService.Verb.INDEX_SCAN,
Arrays.copyOf(dob.getData(), dob.getLength()),
version);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Wed Jun 29 18:55:50 2011
@@ -91,7 +91,7 @@ public class RangeSliceCommand implement
{
DataOutputBuffer dob = new DataOutputBuffer();
serializer.serialize(this, dob, version);
- return new Message(FBUtilities.getLocalAddress(),
+ return new Message(FBUtilities.getBroadcastAddress(),
StorageService.Verb.RANGE_SLICE,
Arrays.copyOf(dob.getData(), dob.getLength()), version);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Wed Jun 29 18:55:50 2011
@@ -49,7 +49,7 @@ public class RangeSliceReply
Row.serializer().serialize(row, dob, originalMessage.getVersion());
}
byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
- return originalMessage.getReply(FBUtilities.getLocalAddress(), data, originalMessage.getVersion());
+ return originalMessage.getReply(FBUtilities.getBroadcastAddress(), data, originalMessage.getVersion());
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Jun 29 18:55:50 2011
@@ -53,7 +53,7 @@ public abstract class ReadCommand implem
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
ReadCommand.serializer().serialize(this, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
}
public final QueryPath queryPath;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Jun 29 18:55:50 2011
@@ -76,7 +76,7 @@ public class ReadVerbHandler implements
byte[] bytes = new byte[readCtx.bufOut_.getLength()];
System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
- Message response = message.getReply(FBUtilities.getLocalAddress(), bytes, message.getVersion());
+ Message response = message.getReply(FBUtilities.getBroadcastAddress(), bytes, message.getVersion());
if (logger_.isDebugEnabled())
logger_.debug(String.format("Read key %s; sending response to %s@%s",
ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Jun 29 18:55:50 2011
@@ -222,7 +222,7 @@ public class RowMutation implements IMut
public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
{
- return new Message(FBUtilities.getLocalAddress(), verb, getSerializedBuffer(version), version);
+ return new Message(FBUtilities.getBroadcastAddress(), verb, getSerializedBuffer(version), version);
}
public synchronized byte[] getSerializedBuffer(int version) throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Jun 29 18:55:50 2011
@@ -88,7 +88,7 @@ public class RowMutationVerbHandler impl
// remove fwds from message to avoid infinite loop
message.removeHeader(RowMutation.FORWARD_HEADER);
- int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length;
+ int bytesPerInetAddress = FBUtilities.getBroadcastAddress().getAddress().length;
assert forwardBytes.length >= bytesPerInetAddress;
assert forwardBytes.length % bytesPerInetAddress == 0;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Jun 29 18:55:50 2011
@@ -407,7 +407,7 @@ public class SystemTable
public static void writeCurrentLocalNodeId(NodeId oldNodeId, NodeId newNodeId)
{
long now = System.currentTimeMillis();
- ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getLocalAddress().getAddress());
+ ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, NODE_ID_CF);
cf.addColumn(new Column(newNodeId.bytes(), ip, now));
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java Wed Jun 29 18:55:50 2011
@@ -54,7 +54,7 @@ public class TruncateResponse
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
TruncateResponse.serializer().serialize(truncateResponseMessage, dos, original.getVersion());
- return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
+ return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion());
}
public TruncateResponse(String keyspace, String columnFamily, boolean success) {
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Wed Jun 29 18:55:50 2011
@@ -72,7 +72,7 @@ public class Truncation implements Messa
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
}
public String toString()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Wed Jun 29 18:55:50 2011
@@ -49,7 +49,7 @@ public class WriteResponse
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
- return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
+ return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion());
}
private final String table_;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java Wed Jun 29 18:55:50 2011
@@ -156,7 +156,7 @@ public class TimeUUIDType extends Abstra
}
else if (source.toLowerCase().equals("now"))
{
- idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress())));
+ idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress())));
}
// Milliseconds since epoch?
else if (source.matches("^\\d+$"))
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java Wed Jun 29 18:55:50 2011
@@ -227,7 +227,7 @@ public class UUIDType extends AbstractUU
}
else if (source.toLowerCase().equals("now"))
{
- idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress())));
+ idBytes = ByteBuffer.wrap(UUIDGen.decompose(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress())));
}
// Milliseconds since epoch?
else if (source.matches("^\\d+$"))
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -40,7 +40,7 @@ public class AddColumnFamily extends Mig
public AddColumnFamily(CFMetaData cfm) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
this.cfm = cfm;
KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cfm.ksName);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java Wed Jun 29 18:55:50 2011
@@ -38,7 +38,7 @@ public class AddKeyspace extends Migrati
public AddKeyspace(KSMetaData ksm) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
if (DatabaseDescriptor.getTableDefinition(ksm.name) != null)
throw new ConfigurationException("Keyspace already exists.");
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -44,7 +44,7 @@ public class DropColumnFamily extends Mi
public DropColumnFamily(String tableName, String cfName) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
this.tableName = tableName;
this.cfName = cfName;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Wed Jun 29 18:55:50 2011
@@ -41,7 +41,7 @@ public class DropKeyspace extends Migrat
public DropKeyspace(String name) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
this.name = name;
KSMetaData ksm = DatabaseDescriptor.getTableDefinition(name);
if (ksm == null)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -47,7 +47,7 @@ public class RenameColumnFamily extends
// be called during deserialization of this migration.
public RenameColumnFamily(String tableName, String oldName, String newName) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
this.tableName = tableName;
this.oldName = oldName;
this.newName = newName;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java Wed Jun 29 18:55:50 2011
@@ -46,7 +46,7 @@ public class RenameKeyspace extends Migr
public RenameKeyspace(String oldName, String newName) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
this.oldName = oldName;
this.newName = newName;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Wed Jun 29 18:55:50 2011
@@ -39,7 +39,7 @@ public class UpdateColumnFamily extends
/** assumes validation has already happened. That is, replacing oldCfm with newCfm is neither illegal or totally whackass. */
public UpdateColumnFamily(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cf_def.keyspace.toString());
if (ksm == null)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java Wed Jun 29 18:55:50 2011
@@ -40,7 +40,7 @@ public class UpdateKeyspace extends Migr
/** create migration based on thrift parameters */
public UpdateKeyspace(KSMetaData ksm) throws ConfigurationException, IOException
{
- super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+ super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), DatabaseDescriptor.getDefsVersion());
assert ksm != null;
assert ksm.cfMetaData() != null;
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Jun 29 18:55:50 2011
@@ -184,7 +184,7 @@ public class BootStrapper
});
InetAddress maxEndpoint = endpoints.get(endpoints.size() - 1);
- assert !maxEndpoint.equals(FBUtilities.getLocalAddress());
+ assert !maxEndpoint.equals(FBUtilities.getBroadcastAddress());
if (metadata.pendingRangeChanges(maxEndpoint) > 0)
throw new RuntimeException("Every node is a bootstrap source! Please specify an initial token manually or wait for an existing bootstrap operation to finish.");
@@ -218,7 +218,7 @@ public class BootStrapper
static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
{
- Message message = new Message(FBUtilities.getLocalAddress(),
+ Message message = new Message(FBUtilities.getBroadcastAddress(),
StorageService.Verb.BOOTSTRAP_TOKEN,
ArrayUtils.EMPTY_BYTE_ARRAY,
Gossiper.instance.getVersion(maxEndpoint));
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed Jun 29 18:55:50 2011
@@ -110,7 +110,7 @@ public class FailureDetector implements
public boolean isAlive(InetAddress ep)
{
- if (ep.equals(FBUtilities.getLocalAddress()))
+ if (ep.equals(FBUtilities.getBroadcastAddress()))
return true;
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep);
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jun 29 18:55:50 2011
@@ -108,9 +108,9 @@ public class Gossiper implements IFailur
MessagingService.instance().waitUntilListening();
/* Update the local heartbeat counter. */
- endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().updateHeartBeat();
+ endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
if (logger.isTraceEnabled())
- logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().getHeartBeatVersion());
+ logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
Gossiper.instance.makeRandomGossipDigest(gDigests);
@@ -210,8 +210,8 @@ public class Gossiper implements IFailur
public Set<InetAddress> getLiveMembers()
{
Set<InetAddress> liveMbrs = new HashSet<InetAddress>(liveEndpoints);
- if (!liveMbrs.contains(FBUtilities.getLocalAddress()))
- liveMbrs.add(FBUtilities.getLocalAddress());
+ if (!liveMbrs.contains(FBUtilities.getBroadcastAddress()))
+ liveMbrs.add(FBUtilities.getBroadcastAddress());
return liveMbrs;
}
@@ -338,7 +338,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
}
Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage, int version) throws IOException
@@ -346,7 +346,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
}
Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message, int version) throws IOException
@@ -354,7 +354,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
}
/**
@@ -414,7 +414,7 @@ public class Gossiper implements IFailur
int size = seeds.size();
if ( size > 0 )
{
- if ( size == 1 && seeds.contains(FBUtilities.getLocalAddress()) )
+ if ( size == 1 && seeds.contains(FBUtilities.getBroadcastAddress()) )
{
return;
}
@@ -441,7 +441,7 @@ public class Gossiper implements IFailur
Set<InetAddress> eps = endpointStateMap.keySet();
for ( InetAddress endpoint : eps )
{
- if ( endpoint.equals(FBUtilities.getLocalAddress()) )
+ if ( endpoint.equals(FBUtilities.getBroadcastAddress()) )
continue;
FailureDetector.instance.interpret(endpoint);
@@ -655,7 +655,7 @@ public class Gossiper implements IFailur
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
{
InetAddress ep = entry.getKey();
- if ( ep.equals(FBUtilities.getLocalAddress()))
+ if ( ep.equals(FBUtilities.getBroadcastAddress()))
continue;
if (justRemovedEndpoints.containsKey(ep))
{
@@ -832,14 +832,14 @@ public class Gossiper implements IFailur
Set<InetAddress> seedHosts = DatabaseDescriptor.getSeeds();
for (InetAddress seed : seedHosts)
{
- if (seed.equals(FBUtilities.getLocalAddress()))
+ if (seed.equals(FBUtilities.getBroadcastAddress()))
continue;
seeds.add(seed);
}
/* initialize the heartbeat state for this localEndpoint */
maybeInitializeLocalState(generationNbr);
- EndpointState localState = endpointStateMap.get(FBUtilities.getLocalAddress());
+ EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
//notify snitches that Gossiper is about to start
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@ -855,13 +855,13 @@ public class Gossiper implements IFailur
// initialize local HB state if needed.
public void maybeInitializeLocalState(int generationNbr)
{
- EndpointState localState = endpointStateMap.get(FBUtilities.getLocalAddress());
+ EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
if ( localState == null )
{
HeartBeatState hbState = new HeartBeatState(generationNbr);
localState = new EndpointState(hbState);
localState.markAlive();
- endpointStateMap.put(FBUtilities.getLocalAddress(), localState);
+ endpointStateMap.put(FBUtilities.getBroadcastAddress(), localState);
}
}
@@ -882,7 +882,7 @@ public class Gossiper implements IFailur
public void addLocalApplicationState(ApplicationState state, VersionedValue value)
{
- EndpointState epState = endpointStateMap.get(FBUtilities.getLocalAddress());
+ EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
assert epState != null;
epState.addApplicationState(state, value);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Jun 29 18:55:50 2011
@@ -170,7 +170,7 @@ public abstract class AbstractReplicatio
//
// we do a 2nd pass on targets instead of using temporary storage,
// to optimize for the common case (everything was alive).
- InetAddress localAddress = FBUtilities.getLocalAddress();
+ InetAddress localAddress = FBUtilities.getBroadcastAddress();
for (InetAddress ep : targets)
{
if (map.containsKey(ep))
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Wed Jun 29 18:55:50 2011
@@ -131,7 +131,7 @@ public class DynamicEndpointSnitch exten
public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
{
- assert address.equals(FBUtilities.getLocalAddress()); // we only know about ourself
+ assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself
if (BADNESS_THRESHOLD == 0)
{
sortByProximityWithScore(address, addresses);
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java Wed Jun 29 18:55:50 2011
@@ -81,14 +81,14 @@ public class Ec2Snitch extends AbstractN
public String getRack(InetAddress endpoint)
{
- if (endpoint.equals(FBUtilities.getLocalAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return ec2zone;
return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value;
}
public String getDatacenter(InetAddress endpoint)
{
- if (endpoint.equals(FBUtilities.getLocalAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return ec2region;
return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/LocalStrategy.java Wed Jun 29 18:55:50 2011
@@ -37,7 +37,7 @@ public class LocalStrategy extends Abstr
public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
{
- return Arrays.asList(FBUtilities.getLocalAddress());
+ return Arrays.asList(FBUtilities.getBroadcastAddress());
}
public int getReplicationFactor()
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jun 29 18:55:50 2011
@@ -66,93 +66,94 @@ public class IncomingTcpConnection exten
MessagingService.validateMagic(input.readInt());
int header = input.readInt();
isStream = MessagingService.getBits(header, 3, 1) == 1;
- if (!isStream)
- // we should buffer
- input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
version = MessagingService.getBits(header, 15, 8);
- if (logger.isDebugEnabled())
- logger.debug("Version for " + socket.getInetAddress() + " is " + version);
- }
- catch (IOException e)
- {
- close();
- throw new IOError(e);
- }
-
- if (version > MessagingService.version_)
- {
- // save the endpoint so gossip will reconnect to it
- Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
- logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
-
- // streaming connections are per-session and have a fixed version. we can't do anything with a new-version
- // stream connection, so drop it.
if (isStream)
{
- close();
- return;
- }
- // for non-streaming connections, continue to read the messages (and ignore them) until sender
- // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
- }
- else
- {
- // only set version when <= to us, otherwise it's the responsibility of the other end to mimic us
- Gossiper.instance.setVersion(socket.getInetAddress(), version);
- }
-
- while (true)
- {
- try
- {
- if (isStream)
+ if (version == MessagingService.version_)
{
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)), version), input);
- break;
- }
+ }
else
{
- int size = input.readInt();
- byte[] contentBytes = new byte[size];
- // readFully allocates a direct buffer the size of the chunk it is asked to read,
- // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654
- int remainder = size % CHUNK_SIZE;
- for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE)
- input.readFully(contentBytes, offset, CHUNK_SIZE);
- input.readFully(contentBytes, size - remainder, remainder);
-
- if (version <= MessagingService.version_)
- {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
- String id = dis.readUTF();
- Message message = Message.serializer().deserialize(dis, version);
- MessagingService.instance().receive(message, id);
- }
+ // streaming connections are per-session and have a fixed version. we can't do anything with a new-version stream connection, so drop it.
+ logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
}
- // prepare to read the next message
- MessagingService.validateMagic(input.readInt());
- int header = input.readInt();
- assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
- version = MessagingService.getBits(header, 15, 8);
+ // We are done with this connection....
+ return;
}
- catch (EOFException e)
+
+ // we should buffer
+ input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
+ // Receive the first message to set the version.
+ Message msg = receiveMessage(input, version);
+ if (version > MessagingService.version_)
{
- if (logger.isTraceEnabled())
- logger.trace("eof reading from socket; closing", e);
- break;
+ // save the endpoint so gossip will reconnect to it
+ Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
+ logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
}
- catch (IOException e)
+ else if (msg != null)
{
- if (logger.isDebugEnabled())
- logger.debug("error reading from socket; closing", e);
- break;
+ Gossiper.instance.setVersion(msg.getFrom(), version);
}
+
+ // loop to get the next message.
+ while (true)
+ {
+ // prepare to read the next message
+ MessagingService.validateMagic(input.readInt());
+ header = input.readInt();
+ assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
+ version = MessagingService.getBits(header, 15, 8);
+ receiveMessage(input, version);
+ }
+ }
+ catch (EOFException e)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("eof reading from socket; closing", e);
+ // connection will be reset so no need to throw an exception.
+ }
+ catch (IOException e)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("IOError reading from socket; closing", e);
+ // throw to be logged else where.
+ throw new IOError(e);
+ }
+ finally
+ {
+ // cleanup.
+ close();
}
+ }
- close();
+ private Message receiveMessage(DataInputStream input, int version) throws IOException
+ {
+ int size = input.readInt();
+ byte[] contentBytes = new byte[size];
+ // readFully allocates a direct buffer the size of the chunk it is asked to read,
+ // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654
+ int remainder = size % CHUNK_SIZE;
+ for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE)
+ input.readFully(contentBytes, offset, CHUNK_SIZE);
+ input.readFully(contentBytes, size - remainder, remainder);
+
+ // for non-streaming connections, continue to read the messages (and ignore them) until sender
+ // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+ if (version <= MessagingService.version_)
+ {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
+ String id = dis.readUTF();
+ Message message = Message.serializer().deserialize(dis, version);
+ MessagingService.instance().receive(message, id);
+ return message;
+ }
+ logger.info("Received connection from newer protocol version. Ignorning message.");
+ return null;
}
private void close()
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Jun 29 18:55:50 2011
@@ -111,7 +111,7 @@ public class Message
public Message getInternalReply(byte[] body, int version)
{
- Header header = new Header(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+ Header header = new Header(FBUtilities.getBroadcastAddress(), StorageService.Verb.INTERNAL_RESPONSE);
return new Message(header, body, version);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jun 29 18:55:50 2011
@@ -341,7 +341,7 @@ public final class MessagingService impl
private void sendOneWay(Message message, String id, InetAddress to)
{
if (logger_.isTraceEnabled())
- logger_.trace(FBUtilities.getLocalAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
+ logger_.trace(FBUtilities.getBroadcastAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
// do local deliveries
if ( message.getFrom().equals(to) )
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java Wed Jun 29 18:55:50 2011
@@ -43,7 +43,7 @@ public abstract class AbstractRowResolve
{
protected static Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
- private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
+ private static final Message FAKE_MESSAGE = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
protected final String table;
protected final ConcurrentMap<Message, ReadResponse> replies = new NonBlockingHashMap<Message, ReadResponse>();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Jun 29 18:55:50 2011
@@ -167,7 +167,7 @@ public class AntiEntropyService
if (!replicaSets.containsKey(range))
return Collections.emptySet();
Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(range));
- neighbors.remove(FBUtilities.getLocalAddress());
+ neighbors.remove(FBUtilities.getBroadcastAddress());
// Excluding all node with version <= 0.7 since they don't know how to
// create a correct merkle tree (they build it over the full range)
Iterator<InetAddress> iter = neighbors.iterator();
@@ -188,7 +188,7 @@ public class AntiEntropyService
*/
private void rendezvous(TreeRequest request, MerkleTree tree)
{
- InetAddress LOCAL = FBUtilities.getLocalAddress();
+ InetAddress LOCAL = FBUtilities.getBroadcastAddress();
// the rendezvous pairs for this session
Map<TreeRequest, TreePair> ctrees = rendezvousPairs(request.sessionid);
@@ -416,7 +416,7 @@ public class AntiEntropyService
public void run()
{
// respond to the request that triggered this validation
- AntiEntropyService.instance.respond(this, FBUtilities.getLocalAddress());
+ AntiEntropyService.instance.respond(this, FBUtilities.getBroadcastAddress());
}
}
@@ -443,7 +443,7 @@ public class AntiEntropyService
*/
public void run()
{
- InetAddress local = FBUtilities.getLocalAddress();
+ InetAddress local = FBUtilities.getBroadcastAddress();
// restore partitioners (in case we were serialized)
if (ltree.partitioner() == null)
@@ -540,7 +540,7 @@ public class AntiEntropyService
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(request, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
}
catch(IOException e)
{
@@ -802,7 +802,7 @@ public class AntiEntropyService
for (InetAddress endpoint : endpoints)
requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname), this);
// send but don't record an outstanding request to the local node
- AntiEntropyService.instance.request(getName(), FBUtilities.getLocalAddress(), range, tablename, cfname);
+ AntiEntropyService.instance.request(getName(), FBUtilities.getBroadcastAddress(), range, tablename, cfname);
}
logger.info("Waiting for repair requests: " + requests.keySet());
requestsMade.signalAll();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Wed Jun 29 18:55:50 2011
@@ -40,7 +40,7 @@ import org.apache.cassandra.utils.FBUtil
public class DatacenterReadCallback<T> extends ReadCallback<T>
{
private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+ private static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Wed Jun 29 18:55:50 2011
@@ -51,7 +51,7 @@ public class DatacenterSyncWriteResponse
private static final String localdc;
static
{
- localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+ localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
}
private final NetworkTopologyStrategy strategy;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Wed Jun 29 18:55:50 2011
@@ -48,7 +48,7 @@ public class DatacenterWriteResponseHand
private static final String localdc;
static
{
- localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+ localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
}
protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Wed Jun 29 18:55:50 2011
@@ -212,7 +212,7 @@ public class MigrationManager implements
}
dout.close();
byte[] body = bout.toByteArray();
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE, body, version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.DEFINITIONS_UPDATE, body, version);
}
// other half of this transformation is in MigrationManager.
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Jun 29 18:55:50 2011
@@ -222,7 +222,7 @@ public class StorageLoadBalancer impleme
private double localLoad()
{
- Double load = loadInfo2_.get(FBUtilities.getLocalAddress());
+ Double load = loadInfo2_.get(FBUtilities.getBroadcastAddress());
return load == null ? 0 : load;
}
@@ -277,11 +277,11 @@ public class StorageLoadBalancer impleme
private boolean isANeighbour(InetAddress neighbour)
{
- InetAddress predecessor = StorageService.instance.getPredecessor(FBUtilities.getLocalAddress());
+ InetAddress predecessor = StorageService.instance.getPredecessor(FBUtilities.getBroadcastAddress());
if ( predecessor.equals(neighbour) )
return true;
- InetAddress successor = StorageService.instance.getSuccessor(FBUtilities.getLocalAddress());
+ InetAddress successor = StorageService.instance.getSuccessor(FBUtilities.getBroadcastAddress());
if ( successor.equals(neighbour) )
return true;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jun 29 18:55:50 2011
@@ -134,7 +134,7 @@ public class StorageProxy implements Sto
*/
public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
{
- final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+ final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
@@ -235,7 +235,7 @@ public class StorageProxy implements Sto
if (targets.size() == 1 && targets.iterator().next().equals(destination))
{
// unhinted writes
- if (destination.equals(FBUtilities.getLocalAddress()))
+ if (destination.equals(FBUtilities.getBroadcastAddress()))
{
if (insertLocalMessages)
insertLocal(rm, responseHandler);
@@ -378,7 +378,7 @@ public class StorageProxy implements Sto
{
InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key());
- if (endpoint.equals(FBUtilities.getLocalAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
return applyCounterMutationOnCoordinator(cm, localDataCenter);
}
@@ -405,7 +405,7 @@ public class StorageProxy implements Sto
private static InetAddress findSuitableEndpoint(String table, ByteBuffer key) throws UnavailableException
{
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);
if (endpoints.isEmpty())
throw new UnavailableException();
return endpoints.get(0);
@@ -509,7 +509,7 @@ public class StorageProxy implements Sto
logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);
RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
@@ -526,7 +526,7 @@ public class StorageProxy implements Sto
}
InetAddress dataPoint = handler.endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getLocalAddress()))
+ if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
{
if (logger.isDebugEnabled())
logger.debug("reading data locally");
@@ -544,7 +544,7 @@ public class StorageProxy implements Sto
MessageProducer producer = new CachingMessageProducer(digestCommand);
for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
{
- if (digestPoint.equals(FBUtilities.getLocalAddress()))
+ if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
{
if (logger.isDebugEnabled())
logger.debug("reading digest locally");
@@ -639,7 +639,7 @@ public class StorageProxy implements Sto
Table table = Table.open(command.table);
ReadResponse result = ReadVerbHandler.getResponse(command, command.getRow(table));
- MessagingService.instance().addLatency(FBUtilities.getLocalAddress(), System.currentTimeMillis() - start);
+ MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
handler.response(result);
}
}
@@ -672,9 +672,9 @@ public class StorageProxy implements Sto
for (AbstractBounds range : ranges)
{
List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
- if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getLocalAddress()))
+ if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
{
if (logger.isDebugEnabled())
logger.debug("local range slice");
@@ -773,7 +773,7 @@ public class StorageProxy implements Sto
// an empty message acts as a request to the SchemaCheckVerbHandler.
for (InetAddress endpoint : liveHosts)
{
- Message message = new Message(FBUtilities.getLocalAddress(),
+ Message message = new Message(FBUtilities.getBroadcastAddress(),
StorageService.Verb.SCHEMA_CHECK,
ArrayUtils.EMPTY_BYTE_ARRAY,
Gossiper.instance.getVersion(endpoint));
@@ -953,7 +953,7 @@ public class StorageProxy implements Sto
for (AbstractBounds range : ranges)
{
List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, range.right);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jun 29 18:55:50 2011
@@ -176,12 +176,12 @@ public class StorageService implements I
public Collection<Range> getLocalRanges(String table)
{
- return getRangesForEndpoint(table, FBUtilities.getLocalAddress());
+ return getRangesForEndpoint(table, FBUtilities.getBroadcastAddress());
}
public Range getLocalPrimaryRange()
{
- return getPrimaryRangeForEndpoint(FBUtilities.getLocalAddress());
+ return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress());
}
private Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
@@ -217,7 +217,7 @@ public class StorageService implements I
if (logger_.isDebugEnabled())
logger_.debug("Setting token to {}", token);
SystemTable.updateToken(token);
- tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
+ tokenMetadata_.updateNormalToken(token, FBUtilities.getBroadcastAddress());
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(getLocalToken()));
setMode("Normal", false);
}
@@ -431,7 +431,6 @@ public class StorageService implements I
Gossiper.instance.register(this);
Gossiper.instance.register(migrationManager);
Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
-
MessagingService.instance().listen(FBUtilities.getLocalAddress());
StorageLoadBalancer.instance.startBroadcasting();
MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
@@ -440,19 +439,19 @@ public class StorageService implements I
HintedHandOffManager.instance.registerMBean();
if (DatabaseDescriptor.isAutoBootstrap()
- && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
+ && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())
&& !SystemTable.isBootstrapped())
logger_.info("This node will not auto bootstrap because it is configured to be a seed node.");
Token token;
if (DatabaseDescriptor.isAutoBootstrap()
- && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped()))
+ && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()) || SystemTable.isBootstrapped()))
{
setMode("Joining: getting load and schema information", true);
StorageLoadBalancer.instance.waitForLoadInfo();
if (logger_.isDebugEnabled())
logger_.debug("... got load + schema info");
- if (tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
+ if (tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
{
String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
throw new UnsupportedOperationException(s);
@@ -542,7 +541,7 @@ public class StorageService implements I
throw new AssertionError(e);
}
setMode("Bootstrapping", true);
- new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).bootstrap(); // handles token update
+ new BootStrapper(FBUtilities.getBroadcastAddress(), token, tokenMetadata_).bootstrap(); // handles token update
}
public boolean isBootstrapMode()
@@ -873,7 +872,7 @@ public class StorageService implements I
if (removeEndpoint == null)
return;
- if (removeEndpoint.equals(FBUtilities.getLocalAddress()))
+ if (removeEndpoint.equals(FBUtilities.getBroadcastAddress()))
{
logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
return;
@@ -1026,7 +1025,7 @@ public class StorageService implements I
*/
private Multimap<InetAddress, Range> getNewSourceRanges(String table, Set<Range> ranges)
{
- InetAddress myAddress = FBUtilities.getLocalAddress();
+ InetAddress myAddress = FBUtilities.getBroadcastAddress();
Multimap<Range, InetAddress> rangeAddresses = Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata_);
Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
IFailureDetector failureDetector = FailureDetector.instance;
@@ -1093,7 +1092,7 @@ public class StorageService implements I
final Multimap<InetAddress, String> fetchSources = HashMultimap.create();
Multimap<String, Map.Entry<InetAddress, Collection<Range>>> rangesToFetch = HashMultimap.create();
- final InetAddress myAddress = FBUtilities.getLocalAddress();
+ final InetAddress myAddress = FBUtilities.getBroadcastAddress();
for (String table : DatabaseDescriptor.getNonSystemTables())
{
@@ -1232,7 +1231,7 @@ public class StorageService implements I
map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
}
// gossiper doesn't see its own updates, so we need to special-case the local node
- map.put(FBUtilities.getLocalAddress().getHostAddress(), getLoadString());
+ map.put(FBUtilities.getBroadcastAddress().getHostAddress(), getLoadString());
return map;
}
@@ -1313,7 +1312,7 @@ public class StorageService implements I
public int getCurrentGenerationNumber()
{
- return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getLocalAddress());
+ return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddress());
}
public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
@@ -1717,19 +1716,19 @@ public class StorageService implements I
private void startLeaving()
{
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalToken()));
- tokenMetadata_.addLeavingEndpoint(FBUtilities.getLocalAddress());
+ tokenMetadata_.addLeavingEndpoint(FBUtilities.getBroadcastAddress());
calculatePendingRanges();
}
public void decommission() throws InterruptedException
{
- if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
+ if (!tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
throw new UnsupportedOperationException("local node is not a member of the token ring yet");
if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2)
throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
for (String table : DatabaseDescriptor.getNonSystemTables())
{
- if (tokenMetadata_.getPendingRanges(table, FBUtilities.getLocalAddress()).size() > 0)
+ if (tokenMetadata_.getPendingRanges(table, FBUtilities.getBroadcastAddress()).size() > 0)
throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
}
@@ -1756,7 +1755,7 @@ public class StorageService implements I
private void leaveRing()
{
SystemTable.setBootstrapped(false);
- tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
+ tokenMetadata_.removeEndpoint(FBUtilities.getBroadcastAddress());
calculatePendingRanges();
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken()));
@@ -1777,7 +1776,7 @@ public class StorageService implements I
for (final String table : DatabaseDescriptor.getNonSystemTables())
{
- Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(table, FBUtilities.getLocalAddress());
+ Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(table, FBUtilities.getBroadcastAddress());
if (logger_.isDebugEnabled())
logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
@@ -1825,7 +1824,7 @@ public class StorageService implements I
throw new IOException("target token " + newToken + " is already owned by another node.");
// address of the current node
- InetAddress localAddress = FBUtilities.getLocalAddress();
+ InetAddress localAddress = FBUtilities.getBroadcastAddress();
List<String> tablesToProcess = DatabaseDescriptor.getNonSystemTables();
// checking if data is moving to this node
@@ -1988,7 +1987,7 @@ public class StorageService implements I
*/
public void removeToken(String tokenString)
{
- InetAddress myAddress = FBUtilities.getLocalAddress();
+ InetAddress myAddress = FBUtilities.getBroadcastAddress();
Token localToken = tokenMetadata_.getToken(myAddress);
Token token = partitioner.getTokenFactory().fromString(tokenString);
InetAddress endpoint = tokenMetadata_.getEndpoint(token);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Wed Jun 29 18:55:50 2011
@@ -67,7 +67,7 @@ public class StreamIn
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
StreamInSession session = StreamInSession.create(source, callback);
- StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getLocalAddress(),
+ StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getBroadcastAddress(),
ranges,
tableName,
columnFamilies,
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Wed Jun 29 18:55:50 2011
@@ -59,7 +59,7 @@ class StreamReply implements MessageProd
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
serializer.serialize(this, dos, version);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Wed Jun 29 18:55:50 2011
@@ -106,7 +106,7 @@ class StreamRequestMessage implements Me
{
throw new IOError(e);
}
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray(), version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray(), version);
}
public String toString()
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1141194&r1=1141193&r2=1141194&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Jun 29 18:55:50 2011
@@ -760,7 +760,7 @@ public class CassandraServer implements
InetAddress address = InetAddress.getByName(endpoint);
boolean endpointValid = null != Gossiper.instance.getEndpointStateForEndpoint(address);
String datacenter = DatabaseDescriptor
- .getEndpointSnitch().getDatacenter(endpointValid ? address : FBUtilities.getLocalAddress());
+ .getEndpointSnitch().getDatacenter(endpointValid ? address : FBUtilities.getBroadcastAddress());
List<InetAddress> addresses = new ArrayList<InetAddress>();
for(String ep : endpoints)
{