You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/21 20:26:04 UTC
svn commit: r828130 [1/3] - in /incubator/cassandra/trunk:
contrib/bmt_example/
contrib/property_snitch/src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/...
Author: jbellis
Date: Wed Oct 21 18:26:02 2009
New Revision: 828130
URL: http://svn.apache.org/viewvc?rev=828130&view=rev
Log:
convert EndPoint to InetAddress (removes MembershipCleaner, the code to drop a node from the cluster entirely. we're probably going to want to resurrect that at some point)
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-498
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ConnectionStatistics.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/EndPoint.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
Modified:
incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureNotification.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java
Modified: incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Wed Oct 21 18:26:02 2009
@@ -59,7 +59,9 @@
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.BigIntegerToken;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SelectorManager;
@@ -113,7 +115,16 @@
for (String token : this.tokens)
{
String[] values = token.split(":");
- StorageService.instance().updateTokenMetadata(new BigIntegerToken(new BigInteger(values[0])),new EndPoint(values[1], 7000));
+ InetAddress address;
+ try
+ {
+ address = InetAddress.getByName(values[1]);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ StorageService.instance().updateTokenMetadataUnsafe(new BigIntegerToken(new BigInteger(values[0])), address);
}
}
public void close()
@@ -160,10 +171,10 @@
/* Get serialized message to send to cluster */
message = createMessage(Keyspace, key.toString(), CFName, columnFamilies);
- for (EndPoint endpoint: StorageService.instance().getReadStorageEndPoints(key.toString()))
+ for (InetAddress endpoint: StorageService.instance().getReadStorageEndPoints(key.toString()))
{
/* Send message to end point */
- MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
+ MessagingService.instance().sendOneWay(message, endpoint);
}
output.collect(key, new Text(" inserted into Cassandra node(s)"));
@@ -234,7 +245,7 @@
throw new RuntimeException(e);
}
}
- rm = new RowMutation(Keyspace,StorageService.getPartitioner().decorateKey(Key));
+ rm = new RowMutation(Keyspace, Key);
rm.add(baseColumnFamily);
try
Modified: incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java (original)
+++ incubator/cassandra/trunk/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
import javax.management.ObjectName;
import org.apache.cassandra.locator.EndPointSnitch;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
@@ -84,7 +84,7 @@
*
* @return a array of string with the first index being the data center and the second being the rack
*/
- public String[] getEndPointInfo(EndPoint endPoint) {
+ public String[] getEndPointInfo(InetAddress endPoint) {
String key = endPoint.toString();
String value = hostProperties.getProperty(key);
if (value == null)
@@ -107,7 +107,7 @@
* @param endPoint the endPoint to process
* @return string of data center
*/
- public String getDataCenterForEndPoint(EndPoint endPoint) {
+ public String getDataCenterForEndPoint(InetAddress endPoint) {
return getEndPointInfo(endPoint)[0];
}
@@ -118,12 +118,12 @@
*
* @return string of rack
*/
- public String getRackForEndPoint(EndPoint endPoint) {
+ public String getRackForEndPoint(InetAddress endPoint) {
return getEndPointInfo(endPoint)[1];
}
@Override
- public boolean isInSameDataCenter(EndPoint host, EndPoint host2)
+ public boolean isInSameDataCenter(InetAddress host, InetAddress host2)
throws UnknownHostException {
if (runInBaseMode)
{
@@ -133,7 +133,7 @@
}
@Override
- public boolean isOnSameRack(EndPoint host, EndPoint host2)
+ public boolean isOnSameRack(InetAddress host, InetAddress host2)
throws UnknownHostException {
if (runInBaseMode)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,9 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.apache.cassandra.service.Cassandra;
import org.apache.cassandra.service.CassandraServer;
import org.apache.cassandra.service.StorageService;
@@ -52,7 +54,10 @@
public RingCache()
{
- seeds_ = DatabaseDescriptor.getSeeds();
+ for (InetAddress seed : DatabaseDescriptor.getSeeds())
+ {
+ seeds_.add(seed.getHostAddress());
+ }
refreshEndPointMap();
}
@@ -69,14 +74,21 @@
Map<String,String> tokenToHostMap = (Map<String,String>) new JSONTokener(client.get_string_property(CassandraServer.TOKEN_MAP)).nextValue();
- HashMap<Token, EndPoint> tokenEndpointMap = new HashMap<Token, EndPoint>();
- Map<EndPoint, Token> endpointTokenMap = new HashMap<EndPoint, Token>();
+ HashMap<Token, InetAddress> tokenEndpointMap = new HashMap<Token, InetAddress>();
+ Map<InetAddress, Token> endpointTokenMap = new HashMap<InetAddress, Token>();
for (Map.Entry<String,String> entry : tokenToHostMap.entrySet())
{
Token token = StorageService.getPartitioner().getTokenFactory().fromString(entry.getKey());
String host = entry.getValue();
- tokenEndpointMap.put(token, new EndPoint(host, port_));
- endpointTokenMap.put(new EndPoint(host, port_), token);
+ try
+ {
+ tokenEndpointMap.put(token, InetAddress.getByName(host));
+ endpointTokenMap.put(InetAddress.getByName(host), token);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e); // host strings are IPs
+ }
}
TokenMetadata tokenMetadata = new TokenMetadata(tokenEndpointMap, endpointTokenMap, null);
@@ -100,7 +112,7 @@
}
}
- public EndPoint[] getEndPoint(String key)
+ public InetAddress[] getEndPoint(String key)
{
return nodePicker_.getReadStorageEndPoints(partitioner_.getToken(key));
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Oct 21 18:26:02 2009
@@ -35,6 +35,7 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
+import java.net.InetAddress;
public class DatabaseDescriptor
{
@@ -52,12 +53,12 @@
private static int controlPort_ = 7001;
private static int thriftPort_ = 9160;
private static boolean thriftFramed_ = false;
- private static String listenAddress_; // leave null so we can fall through to getLocalHost
- private static String thriftAddress_;
+ private static InetAddress listenAddress_; // leave null so we can fall through to getLocalHost
+ private static InetAddress thriftAddress_;
private static String clusterName_ = "Test";
private static int replicationFactor_ = 3;
private static long rpcTimeoutInMillis_ = 2000;
- private static Set<String> seeds_ = new HashSet<String>();
+ private static Set<InetAddress> seeds_ = new HashSet<InetAddress>();
/* Keeps the list of data file directories */
private static String[] dataFileDirectories_;
/* Current index into the above list of directories */
@@ -276,12 +277,12 @@
/* Local IP or hostname to bind services to */
String listenAddress = xmlUtils.getNodeValue("/Storage/ListenAddress");
if ( listenAddress != null)
- listenAddress_ = listenAddress;
+ listenAddress_ = InetAddress.getByName(listenAddress);
/* Local IP or hostname to bind thrift server to */
String thriftAddress = xmlUtils.getNodeValue("/Storage/ThriftAddress");
if ( thriftAddress != null )
- thriftAddress_ = thriftAddress;
+ thriftAddress_ = InetAddress.getByName(thriftAddress);
/* UDP port for control messages */
port = xmlUtils.getNodeValue("/Storage/ControlPort");
@@ -538,7 +539,7 @@
}
for( int i = 0; i < seeds.length; ++i )
{
- seeds_.add( seeds[i] );
+ seeds_.add(InetAddress.getByName(seeds[i]));
}
}
catch (ConfigurationException e)
@@ -843,7 +844,7 @@
logFileDirectory_ = logLocation;
}
- public static Set<String> getSeeds()
+ public static Set<InetAddress> getSeeds()
{
return seeds_;
}
@@ -918,12 +919,12 @@
}
}
- public static String getListenAddress()
+ public static InetAddress getListenAddress()
{
return listenAddress_;
}
- public static String getThriftAddress()
+ public static InetAddress getThriftAddress()
{
return thriftAddress_;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Oct 21 18:26:02 2009
@@ -36,7 +36,7 @@
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
@@ -272,7 +272,7 @@
* This method forces a compaction of the SSTables on disk. We wait
* for the process to complete by waiting on a future pointer.
*/
- List<SSTableReader> forceAntiCompaction(List<Range> ranges, EndPoint target)
+ List<SSTableReader> forceAntiCompaction(List<Range> ranges, InetAddress target)
{
assert ranges != null;
Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target);
@@ -704,7 +704,7 @@
return maxFile;
}
- List<SSTableReader> doAntiCompaction(List<Range> ranges, EndPoint target) throws IOException
+ List<SSTableReader> doAntiCompaction(List<Range> ranges, InetAddress target) throws IOException
{
return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target);
}
@@ -757,7 +757,7 @@
* @return
* @throws IOException
*/
- List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> sstables, Collection<Range> ranges, EndPoint target) throws IOException
+ List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target) throws IOException
{
logger_.info("AntiCompacting [" + StringUtils.join(sstables, ",") + "]");
// Calculate the expected compacted filesize
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Oct 21 18:26:02 2009
@@ -34,7 +34,7 @@
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
public class CompactionManager implements CompactionManagerMBean
{
@@ -75,9 +75,9 @@
{
private ColumnFamilyStore columnFamilyStore_;
private List<Range> ranges_;
- private EndPoint target_;
+ private InetAddress target_;
- FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target)
+ FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, InetAddress target)
{
columnFamilyStore_ = columnFamilyStore;
ranges_ = ranges;
@@ -187,7 +187,7 @@
compactor_.submit(new CleanupCompactor(columnFamilyStore));
}
- public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target)
+ public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, InetAddress target)
{
return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target) );
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -26,8 +26,8 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.log4j.Logger;
@@ -52,7 +52,7 @@
{
dos.writeUTF(sstable.getFilename());
}
- Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bos.toByteArray());
+ Message response = message.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
MessagingService.instance().sendOneWay(response, message.getFrom());
}
catch (IOException ex)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Oct 21 18:26:02 2009
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
@@ -32,7 +33,7 @@
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
@@ -100,9 +101,8 @@
return instance_;
}
- private static boolean sendMessage(String endpointAddress, String tableName, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
+ private static boolean sendMessage(InetAddress endPoint, String tableName, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
{
- EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
if (!FailureDetector.instance().isAlive(endPoint))
{
return false;
@@ -113,7 +113,7 @@
RowMutation rm = new RowMutation(tableName, row);
Message message = rm.makeRowMutationMessage();
QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
- MessagingService.instance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+ MessagingService.instance().sendRR(message, new InetAddress[]{ endPoint }, quorumResponseHandler);
return quorumResponseHandler.get();
}
@@ -161,8 +161,7 @@
int deleted = 0;
for (IColumn endpoint : endpoints)
{
- String endpointStr = new String(endpoint.name(), "UTF-8");
- if (sendMessage(endpointStr, tableName, keyStr))
+ if (sendMessage(InetAddress.getByAddress(endpoint.name()), tableName, keyStr))
{
deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
deleted++;
@@ -181,12 +180,12 @@
logger_.debug("Finished deliverAllHints");
}
- private static void deliverHintsToEndpoint(EndPoint endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+ private static void deliverHintsToEndpoint(InetAddress endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
{
if (logger_.isDebugEnabled())
logger_.debug("Started hinted handoff for endPoint " + endPoint);
- String targetEPBytes = endPoint.getHost();
+ byte[] targetEPBytes = endPoint.getAddress();
// 1. Scan through all the keys that we need to handoff
// 2. For each key read the list of recipients if the endpoint matches send
// 3. Delete that recipient from the key if write was successful
@@ -206,7 +205,7 @@
Collection<IColumn> endpoints = keyColumn.getSubColumns();
for (IColumn hintEndPoint : endpoints)
{
- if (new String(hintEndPoint.name(), "UTF-8").equals(targetEPBytes) && sendMessage(endPoint.getHost(), null, keyStr))
+ if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, null, keyStr))
{
if (endpoints.size() == 1)
{
@@ -256,7 +255,7 @@
* When we learn that some endpoint is back up we deliver the data
* to him via an event driven mechanism.
*/
- public void deliverHints(final EndPoint to)
+ public void deliverHints(final InetAddress to)
{
Runnable r = new Runnable()
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Wed Oct 21 18:26:02 2009
@@ -22,16 +22,13 @@
import java.io.IOException;
import java.io.DataInputStream;
import java.util.Arrays;
-import java.util.List;
-import java.util.Collections;
-
-import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
public class RangeCommand
{
@@ -56,7 +53,7 @@
{
DataOutputBuffer dob = new DataOutputBuffer();
serializer.serialize(this, dob);
- return new Message(StorageService.getLocalStorageEndPoint(),
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.readStage_,
StorageService.rangeVerbHandler_,
Arrays.copyOf(dob.getData(), dob.getLength()));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,7 @@
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
public class RangeReply
{
@@ -49,7 +49,7 @@
dob.writeUTF(key);
}
byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
- return originalMessage.getReply(StorageService.getLocalStorageEndPoint(), data);
+ return originalMessage.getReply(FBUtilities.getLocalAddress(), data);
}
@Override
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Oct 21 18:26:02 2009
@@ -30,6 +30,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.FBUtilities;
public abstract class ReadCommand
@@ -52,7 +53,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
ReadCommand.serializer().serialize(this, dos);
- return new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
}
public final QueryPath queryPath;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Wed Oct 21 18:26:02 2009
@@ -22,10 +22,12 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.commons.lang.ArrayUtils;
@@ -53,7 +55,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
ReadResponse.serializer().serialize(readResponse, dos);
- Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());
+ Message message = new Message(FBUtilities.getLocalAddress(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());
return message;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -24,7 +24,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -97,7 +97,7 @@
byte[] bytes = new byte[readCtx.bufOut_.getLength()];
System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
- Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
+ Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
if (logger_.isDebugEnabled())
logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
MessagingService.instance().sendOneWay(response, message.getFrom());
@@ -116,9 +116,9 @@
private void doReadRepair(Row row, ReadCommand readCommand)
{
- List<EndPoint> endpoints = StorageService.instance().getLiveReadStorageEndPoints(readCommand.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ List<InetAddress> endpoints = StorageService.instance().getLiveReadStorageEndPoints(readCommand.key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove(FBUtilities.getLocalAddress());
if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Oct 21 18:26:02 2009
@@ -36,7 +36,7 @@
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -105,9 +105,9 @@
return modifications_.values();
}
- void addHints(String key, String host) throws IOException
+ void addHints(String key, byte[] host) throws IOException
{
- QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
+ QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host);
add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
}
@@ -224,9 +224,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- EndPoint local = StorageService.getLocalStorageEndPoint();
- EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
- return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
}
public static RowMutation getRowMutation(String table, String key, Map<String, List<ColumnOrSuperColumn>> cfmap)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Wed Oct 21 18:26:02 2009
@@ -27,7 +27,6 @@
import javax.xml.bind.annotation.XmlElement;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -52,9 +51,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
RowMutationMessage.serializer().serialize(this, dos);
- EndPoint local = StorageService.getLocalStorageEndPoint();
- EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
- return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
}
@XmlElement(name="RowMutation")
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -21,7 +21,7 @@
import java.io.*;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -49,12 +49,12 @@
byte[] hintedBytes = message.getHeader(RowMutation.HINT);
if ( hintedBytes != null && hintedBytes.length > 0 )
{
- EndPoint hint = EndPoint.getByAddress(hintedBytes);
+ InetAddress hint = InetAddress.getByAddress(hintedBytes);
if (logger_.isDebugEnabled())
logger_.debug("Adding hint for " + hint);
/* add necessary hints to this mutation */
RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable());
- hintedMutation.addHints(rm.key(), hint.getHost());
+ hintedMutation.addHints(rm.key(), hintedBytes);
hintedMutation.apply();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Oct 21 18:26:02 2009
@@ -32,7 +32,7 @@
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
public class SystemTable
{
@@ -60,11 +60,11 @@
/**
* Record token being used by another node
*/
- public static synchronized void updateToken(EndPoint ep, Token token) throws IOException
+ public static synchronized void updateToken(InetAddress ep, Token token) throws IOException
{
IPartitioner p = StorageService.getPartitioner();
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
- cf.addColumn(new Column(ep.getHost().getBytes("UTF-8"), p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
+ cf.addColumn(new Column(ep.getAddress(), p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
rm.add(cf);
rm.apply();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SSTableWriter;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.io.IStreamComplete;
@@ -301,7 +301,7 @@
* do a complete compaction since we can figure out based on the ranges
* whether the files need to be split.
*/
- public List<SSTableReader> forceAntiCompaction(List<Range> ranges, EndPoint target)
+ public List<SSTableReader> forceAntiCompaction(List<Range> ranges, InetAddress target)
{
List<SSTableReader> allResults = new ArrayList<SSTableReader>();
Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Wed Oct 21 18:26:02 2009
@@ -22,14 +22,10 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlElement;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
/*
@@ -51,7 +47,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
WriteResponse.serializer().serialize(writeResponseMessage, dos);
- return original.getReply(StorageService.getLocalStorageEndPoint(), bos.toByteArray());
+ return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
}
private final String table_;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Oct 21 18:26:02 2009
@@ -31,13 +31,13 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.File;
+ import java.net.InetAddress;
import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.net.io.IStreamComplete;
@@ -77,12 +77,12 @@
private static final ExecutorService bootstrapExecutor_ = new DebuggableThreadPoolExecutor("BOOT-STRAPPER");
/* endpoints that need to be bootstrapped */
- protected EndPoint[] targets_ = new EndPoint[0];
+ protected InetAddress[] targets_ = new InetAddress[0];
/* tokens of the nodes being bootstrapped. */
protected final Token[] tokens_;
protected TokenMetadata tokenMetadata_ = null;
- public BootStrapper(EndPoint[] target, Token... token)
+ public BootStrapper(InetAddress[] target, Token... token)
{
targets_ = target;
tokens_ = token;
@@ -116,7 +116,7 @@
Map<Range, List<BootstrapSourceTarget>> getRangesWithSourceTarget()
{
/* copy the token to endpoint map */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* remove the tokens associated with the endpoints being bootstrapped */
for (Token token : tokens_)
{
@@ -133,9 +133,9 @@
*/
Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);
/* Calculate the list of nodes that handle the old ranges */
- Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
+ Map<Range, List<InetAddress>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
/* Mapping of split ranges to the list of endpoints responsible for the range */
- Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
+ Map<Range, List<InetAddress>> replicasForSplitRanges = new HashMap<Range, List<InetAddress>>();
Set<Range> rangesSplit = splitRanges.keySet();
for ( Range splitRange : rangesSplit )
{
@@ -151,11 +151,11 @@
for ( Range splitRange : rangesSplit )
{
List<Range> subRanges = splitRanges.get(splitRange);
- List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+ List<InetAddress> replicas = replicasForSplitRanges.get(splitRange);
for ( Range subRange : subRanges )
{
/* Make sure we clone or else we are hammered. */
- oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ oldRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
}
}
@@ -166,15 +166,15 @@
if (logger_.isDebugEnabled())
logger_.debug("Total number of new ranges " + newRanges.length);
/* Calculate the list of nodes that handle the new ranges */
- Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
+ Map<Range, List<InetAddress>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
/* Calculate ranges that need to be sent and from whom to where */
Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
return rangesWithSourceTarget;
}
- private static Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
+ private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
{
- Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+ Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
BootstrapTokenCallback btc = new BootstrapTokenCallback();
MessagingService.instance().sendRR(message, maxEndpoint, btc);
return btc.getToken();
@@ -193,8 +193,8 @@
if (DatabaseDescriptor.getInitialToken() == null)
{
double maxLoad = 0;
- EndPoint maxEndpoint = null;
- for (Map.Entry<EndPoint,Double> entry : slb.getLoadInfo().entrySet())
+ InetAddress maxEndpoint = null;
+ for (Map.Entry<InetAddress,Double> entry : slb.getLoadInfo().entrySet())
{
if (maxEndpoint == null || entry.getValue() > maxLoad)
{
@@ -207,16 +207,15 @@
throw new RuntimeException("No bootstrap sources found");
}
- if (!maxEndpoint.equals(StorageService.getLocalStorageEndPoint()))
+ if (!maxEndpoint.equals(FBUtilities.getLocalAddress()))
{
- EndPoint maxStorageEndpoint = new EndPoint(maxEndpoint.getHost(), DatabaseDescriptor.getStoragePort());
- Token<?> t = getBootstrapTokenFrom(maxStorageEndpoint);
+ Token<?> t = getBootstrapTokenFrom(maxEndpoint);
logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint);
ss.updateToken(t);
}
}
- BootStrapper bs = new BootStrapper(new EndPoint[] {StorageService.getLocalStorageEndPoint()}, ss.getLocalToken());
+ BootStrapper bs = new BootStrapper(new InetAddress[] { FBUtilities.getLocalAddress() }, ss.getLocalToken());
bootstrapExecutor_.submit(bs);
Gossiper.instance().addApplicationState(StorageService.BOOTSTRAP_MODE, new ApplicationState(""));
}
@@ -231,7 +230,7 @@
Message response;
try
{
- response = message.getReply(ss.getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
+ response = message.getReply(FBUtilities.getLocalAddress(), tokens.get(1).getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e)
{
@@ -319,14 +318,14 @@
if (logger_.isDebugEnabled())
logger_.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
streamContext.setTargetFile(file);
- addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
+ addStreamContext(message.getFrom(), streamContext, streamStatus);
}
- StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new BootstrapCompletionHandler());
+ StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new BootstrapCompletionHandler());
/* Send a bootstrap initiation done message to execute on default stage. */
if (logger_.isDebugEnabled())
logger_.debug("Sending a bootstrap initiate done message ...");
- Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
+ Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
}
catch ( IOException ex )
@@ -384,7 +383,7 @@
return fileNames;
}
- private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+ private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
{
if (logger_.isDebugEnabled())
logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
@@ -400,7 +399,7 @@
*/
private static class BootstrapCompletionHandler implements IStreamComplete
{
- public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+ public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
{
/* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
if (streamContext.getTargetFile().contains("-Data.db"))
@@ -426,16 +425,15 @@
}
}
- EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
+ logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + host);
/* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.instance().sendOneWay(message, to);
+ MessagingService.instance().sendOneWay(message, host);
/* If we're done with everything for this host, remove from bootstrap sources */
- if (StreamContextManager.isDone(to.getHost()))
- StorageService.instance().removeBootstrapSource(to);
+ if (StreamContextManager.isDone(host))
+ StorageService.instance().removeBootstrapSource(host);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java Wed Oct 21 18:26:02 2009
@@ -24,12 +24,11 @@
import java.io.IOException;
import java.io.Serializable;
-import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.utils.FBUtilities;
public class BootstrapInitiateMessage implements Serializable
{
@@ -50,7 +49,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
- return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
}
protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java Wed Oct 21 18:26:02 2009
@@ -26,7 +26,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
@@ -50,10 +50,10 @@
return serializer_;
}
- protected EndPoint target_;
+ protected InetAddress target_;
protected List<Range> ranges_;
- BootstrapMetadata(EndPoint target, List<Range> ranges)
+ BootstrapMetadata(InetAddress target, List<Range> ranges)
{
target_ = target;
ranges_ = ranges;
@@ -87,7 +87,7 @@
public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
{
- EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
+ InetAddress target = CompactEndPointSerializationHelper.deserialize(dis);
int size = dis.readInt();
List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
for( int i = 0; i < size; ++i )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Wed Oct 21 18:26:02 2009
@@ -26,10 +26,10 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
-
-/**
+ /**
* This class encapsulates the message that needs to be sent
* to nodes that handoff data. The message contains information
* about the node to be bootstrapped and the ranges with which
@@ -53,7 +53,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
- return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
}
protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Wed Oct 21 18:26:02 2009
@@ -27,7 +27,7 @@
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -93,7 +93,7 @@
* locally for each range and then stream them using
* the Bootstrap protocol to the target endpoint.
*/
- private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
+ private void doTransfer(InetAddress target, List<Range> ranges) throws IOException
{
if ( ranges.size() == 0 )
{
@@ -146,7 +146,7 @@
* Stream the files in the bootstrap directory over to the
* node being bootstrapped.
*/
- private void doHandoff(EndPoint target, List<String> fileList, String table) throws IOException
+ private void doHandoff(InetAddress target, List<String> fileList, String table) throws IOException
{
List<File> filesList = new ArrayList<File>();
for(String file : fileList)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
package org.apache.cassandra.dht;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
/**
* This class encapsulates who is the source and the
@@ -26,10 +26,10 @@
*/
class BootstrapSourceTarget
{
- protected EndPoint source_;
- protected EndPoint target_;
+ protected InetAddress source_;
+ protected InetAddress target_;
- BootstrapSourceTarget(EndPoint source, EndPoint target)
+ BootstrapSourceTarget(InetAddress source, InetAddress target)
{
source_ = source;
target_ = target;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Wed Oct 21 18:26:02 2009
@@ -28,7 +28,7 @@
import org.apache.log4j.Logger;
- import org.apache.cassandra.net.EndPoint;
+ import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -100,7 +100,7 @@
return splitRanges;
}
- protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
+ protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<InetAddress>> oldRangeToEndPointMap, Map<Range, List<InetAddress>> newRangeToEndPointMap)
{
Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
/*
@@ -114,12 +114,12 @@
{
if (logger_.isDebugEnabled())
logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
- List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
- List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+ List<InetAddress> oldEndPoints = oldRangeToEndPointMap.get(range);
+ List<InetAddress> newEndPoints = newRangeToEndPointMap.get(range);
if ( newEndPoints != null )
{
- List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
- for ( EndPoint newEndPoint : newEndPoints2 )
+ List<InetAddress> newEndPoints2 = new ArrayList<InetAddress>(newEndPoints);
+ for ( InetAddress newEndPoint : newEndPoints2 )
{
if ( oldEndPoints.contains(newEndPoint) )
{
@@ -137,8 +137,8 @@
}
for ( Range range : oldRangeSet )
{
- List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
- List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+ List<InetAddress> oldEndPoints = oldRangeToEndPointMap.get(range);
+ List<InetAddress> newEndPoints = newRangeToEndPointMap.get(range);
List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
if ( srcTarget == null )
{
@@ -146,7 +146,7 @@
rangesWithSourceTarget.put(range, srcTarget);
}
int i = 0;
- for ( EndPoint oldEndPoint : oldEndPoints )
+ for ( InetAddress oldEndPoint : oldEndPoints )
{
srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
}
@@ -160,7 +160,7 @@
*/
protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
{
- Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = getWorkMap(rangesWithSourceTarget);
+ Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo = getWorkMap(rangesWithSourceTarget);
sendMessagesToBootstrapSources(rangeInfo);
}
@@ -168,15 +168,15 @@
* This method takes the Src -> (Tgt-> List of ranges) maps and retains those entries
* that are relevant to bootstrapping the target endpoint
*/
- protected static Map<EndPoint, Map<EndPoint, List<Range>>>
- filterRangesForTargetEndPoint(Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo, EndPoint targetEndPoint)
+ protected static Map<InetAddress, Map<InetAddress, List<Range>>>
+ filterRangesForTargetEndPoint(Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo, InetAddress targetEndPoint)
{
- Map<EndPoint, Map<EndPoint, List<Range>>> filteredMap = new HashMap<EndPoint, Map<EndPoint,List<Range>>>();
- for (Map.Entry<EndPoint, Map<EndPoint, List<Range>>> e: rangeInfo.entrySet())
+ Map<InetAddress, Map<InetAddress, List<Range>>> filteredMap = new HashMap<InetAddress, Map<InetAddress,List<Range>>>();
+ for (Map.Entry<InetAddress, Map<InetAddress, List<Range>>> e: rangeInfo.entrySet())
{
- EndPoint source = e.getKey();
- Map<EndPoint, List<Range>> targets = e.getValue();
- Map<EndPoint, List<Range>> filteredTargets = new HashMap<EndPoint, List<Range>>();
+ InetAddress source = e.getKey();
+ Map<InetAddress, List<Range>> targets = e.getValue();
+ Map<InetAddress, List<Range>> filteredTargets = new HashMap<InetAddress, List<Range>>();
if (targets.get(targetEndPoint) != null)
filteredTargets.put(targetEndPoint, targets.get(targetEndPoint));
if (filteredTargets.size() > 0)
@@ -185,16 +185,16 @@
return filteredMap;
}
- private static void sendMessagesToBootstrapSources(Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo) throws IOException
+ private static void sendMessagesToBootstrapSources(Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo) throws IOException
{
- Set<EndPoint> sources = rangeInfo.keySet();
- for ( EndPoint source : sources )
+ Set<InetAddress> sources = rangeInfo.keySet();
+ for ( InetAddress source : sources )
{
- Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
- Set<EndPoint> targets = targetRangesMap.keySet();
+ Map<InetAddress, List<Range>> targetRangesMap = rangeInfo.get(source);
+ Set<InetAddress> targets = targetRangesMap.keySet();
List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
- for ( EndPoint target : targets )
+ for ( InetAddress target : targets )
{
List<Range> rangeForTarget = targetRangesMap.get(target);
BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
@@ -211,14 +211,14 @@
}
}
- static Map<EndPoint, Map<EndPoint, List<Range>>> getWorkMap(
+ static Map<InetAddress, Map<InetAddress, List<Range>>> getWorkMap(
Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget)
{
/*
* Map whose key is the source node and the value is a map whose key is the
* target and value is the list of ranges to be sent to it.
*/
- Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
+ Map<InetAddress, Map<InetAddress, List<Range>>> rangeInfo = new HashMap<InetAddress, Map<InetAddress, List<Range>>>();
Set<Range> ranges = rangesWithSourceTarget.keySet();
for ( Range range : ranges )
@@ -226,10 +226,10 @@
List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
{
- Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
+ Map<InetAddress, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
if ( targetRangeMap == null )
{
- targetRangeMap = new HashMap<EndPoint, List<Range>>();
+ targetRangeMap = new HashMap<InetAddress, List<Range>>();
rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
}
List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Wed Oct 21 18:26:02 2009
@@ -29,7 +29,7 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
+ import java.net.InetAddress;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
@@ -45,13 +45,13 @@
private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolImpl.class);
/* endpoints that are to be moved. */
- protected EndPoint[] targets_ = new EndPoint[0];
+ protected InetAddress[] targets_ = new InetAddress[0];
/* position where they need to be moved */
protected final Token[] tokens_;
/* token metadata information */
protected TokenMetadata tokenMetadata_ = null;
- public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
+ public LeaveJoinProtocolImpl(InetAddress[] targets, Token[] tokens)
{
targets_ = targets;
tokens_ = tokens;
@@ -65,16 +65,16 @@
if (logger_.isDebugEnabled())
logger_.debug("Beginning leave/join process for ...");
/* copy the token to endpoint map */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* copy the endpoint to token map */
- Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+ Map<InetAddress, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
if (logger_.isDebugEnabled())
logger_.debug("Total number of old ranges " + oldRanges.length);
/* Calculate the list of nodes that handle the old ranges */
- Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
+ Map<Range, List<InetAddress>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
/* Remove the tokens of the nodes leaving the ring */
Set<Token> tokens = getTokensForLeavingNodes();
@@ -98,7 +98,7 @@
tokenToEndPointMap.put(tokens_[i], targets_[i]);
}
/* Calculate the list of nodes that handle the new ranges */
- Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
+ Map<Range, List<InetAddress>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges, tokenToEndPointMap);
/* Remove any expanded ranges and replace them with ranges whose aggregate is the expanded range in the new configuration. */
removeExpandedRangesFromNewConfiguration(newRangeToEndPointMap, expandedRangeToOldRangeMap);
/* Calculate ranges that need to be sent and from whom to where */
@@ -132,7 +132,7 @@
* @param rangesAfterNodesJoin ranges after the nodes have joined at
* their respective position.
*/
- private void addSplitRangesToOldConfiguration(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
+ private void addSplitRangesToOldConfiguration(Map<Range, List<InetAddress>> oldRangeToEndPointMap, Range[] rangesAfterNodesJoin)
{
/*
* Find the ranges that are split. Maintain a mapping between
@@ -140,7 +140,7 @@
*/
Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRangeToEndPointMap.keySet().toArray( new Range[0] ), tokens_);
/* Mapping of split ranges to the list of endpoints responsible for the range */
- Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
+ Map<Range, List<InetAddress>> replicasForSplitRanges = new HashMap<Range, List<InetAddress>>();
Set<Range> rangesSplit = splitRanges.keySet();
for ( Range splitRange : rangesSplit )
{
@@ -156,11 +156,11 @@
for ( Range splitRange : rangesSplit )
{
List<Range> subRanges = splitRanges.get(splitRange);
- List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+ List<InetAddress> replicas = replicasForSplitRanges.get(splitRange);
for ( Range subRange : subRanges )
{
/* Make sure we clone or else we are hammered. */
- oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ oldRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
}
}
}
@@ -175,10 +175,10 @@
* @param expandedRangeToOldRangeMap mapping between the expanded ranges
* and the ranges whose aggregate is the expanded range.
*/
- private void removeExpandedRangesFromNewConfiguration(Map<Range, List<EndPoint>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
+ private void removeExpandedRangesFromNewConfiguration(Map<Range, List<InetAddress>> newRangeToEndPointMap, Map<Range, List<Range>> expandedRangeToOldRangeMap)
{
/* Get the replicas for the expanded ranges */
- Map<Range, List<EndPoint>> replicasForExpandedRanges = new HashMap<Range, List<EndPoint>>();
+ Map<Range, List<InetAddress>> replicasForExpandedRanges = new HashMap<Range, List<InetAddress>>();
Set<Range> expandedRanges = expandedRangeToOldRangeMap.keySet();
for ( Range expandedRange : expandedRanges )
{
@@ -189,10 +189,10 @@
for ( Range expandedRange : expandedRanges )
{
List<Range> subRanges = expandedRangeToOldRangeMap.get(expandedRange);
- List<EndPoint> replicas = replicasForExpandedRanges.get(expandedRange);
+ List<InetAddress> replicas = replicasForExpandedRanges.get(expandedRange);
for ( Range subRange : subRanges )
{
- newRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ newRangeToEndPointMap.put(subRange, new ArrayList<InetAddress>(replicas));
}
}
}
@@ -200,7 +200,7 @@
private Set<Token> getTokensForLeavingNodes()
{
Set<Token> tokens = new HashSet<Token>();
- for ( EndPoint target : targets_ )
+ for ( InetAddress target : targets_ )
{
tokens.add(tokenMetadata_.getToken(target));
}
@@ -277,16 +277,16 @@
public static void main(String[] args) throws Throwable
{
StorageService ss = StorageService.instance();
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("3"), new EndPoint("A", 7000));
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("6"), new EndPoint("B", 7000));
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("9"), new EndPoint("C", 7000));
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("12"), new EndPoint("D", 7000));
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("15"), new EndPoint("E", 7000));
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("18"), new EndPoint("F", 7000));
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("21"), new EndPoint("G", 7000));
- ss.updateTokenMetadataUnsafe(new BigIntegerToken("24"), new EndPoint("H", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("3"), InetAddress.getByName("A"));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("6"), InetAddress.getByName("B"));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("9"), InetAddress.getByName("C"));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("12"), InetAddress.getByName("D"));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("15"), InetAddress.getByName("E"));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("18"), InetAddress.getByName("F"));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("21"), InetAddress.getByName("G"));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("24"), InetAddress.getByName("H"));
- Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
+ Runnable runnable = new LeaveJoinProtocolImpl( new InetAddress[]{InetAddress.getByName("C"), InetAddress.getByName("D")}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
runnable.run();
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.BoundedStatsDeque;
@@ -75,7 +75,7 @@
return failureDetector_;
}
- private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
+ private Map<InetAddress, ArrivalWindow> arrivalSamples_ = new Hashtable<InetAddress, ArrivalWindow>();
private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
public FailureDetector()
@@ -117,7 +117,7 @@
*
* @param ep for which the arrival window needs to be dumped.
*/
- private void dumpInterArrivalTimes(EndPoint ep)
+ private void dumpInterArrivalTimes(InetAddress ep)
{
long now = System.currentTimeMillis();
if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
@@ -135,26 +135,19 @@
}
}
- public boolean isAlive(EndPoint ep)
+ public boolean isAlive(InetAddress ep)
{
- try
- {
- /* If the endpoint in question is the local endpoint return true. */
- String localHost = FBUtilities.getHostAddress();
- if ( localHost.equals( ep.getHost() ) )
- return true;
- }
- catch( UnknownHostException ex )
- {
- logger_.info( LogUtil.throwableToString(ex) );
- }
+ /* If the endpoint in question is the local endpoint return true. */
+ InetAddress localHost = FBUtilities.getLocalAddress();
+ if (localHost.equals(ep))
+ return true;
+
/* Incoming port is assumed to be the Storage port. We need to change it to the control port */
- EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());
- EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
+ EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep);
return epState.isAlive();
}
- public void report(EndPoint ep)
+ public void report(InetAddress ep)
{
if (logger_.isTraceEnabled())
logger_.trace("reporting " + ep);
@@ -168,7 +161,7 @@
heartbeatWindow.add(now);
}
- public void interpret(EndPoint ep)
+ public void interpret(InetAddress ep)
{
ArrivalWindow hbWnd = arrivalSamples_.get(ep);
if ( hbWnd == null )
@@ -214,10 +207,10 @@
public String toString()
{
StringBuilder sb = new StringBuilder();
- Set<EndPoint> eps = arrivalSamples_.keySet();
+ Set<InetAddress> eps = arrivalSamples_.keySet();
sb.append("-----------------------------------------------------------------------");
- for ( EndPoint ep : eps )
+ for ( InetAddress ep : eps )
{
ArrivalWindow hWnd = arrivalSamples_.get(ep);
sb.append(ep + " : ");
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java Wed Oct 21 18:26:02 2009
@@ -24,7 +24,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
import org.apache.cassandra.net.*;
/**
@@ -40,7 +40,7 @@
serializer_ = new GossipDigestSerializer();
}
- EndPoint endPoint_;
+ InetAddress endPoint_;
int generation_;
int maxVersion_;
@@ -49,14 +49,14 @@
return serializer_;
}
- GossipDigest(EndPoint endPoint, int generation, int maxVersion)
+ GossipDigest(InetAddress endPoint, int generation, int maxVersion)
{
endPoint_ = endPoint;
generation_ = generation;
maxVersion_ = maxVersion;
}
- EndPoint getEndPoint()
+ InetAddress getEndPoint()
{
return endPoint_;
}
@@ -101,7 +101,7 @@
public GossipDigest deserialize(DataInputStream dis) throws IOException
{
- EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
+ InetAddress endPoint = CompactEndPointSerializationHelper.deserialize(dis);
int generation = dis.readInt();
int version = dis.readInt();
return new GossipDigest(endPoint, generation, version);