You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:13 UTC
[16/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 8c0ed1e..f01197d 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -52,11 +52,13 @@ public class ConfigHelper
private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
private static final String INPUT_INITIAL_ADDRESS = "cassandra.input.address";
private static final String OUTPUT_INITIAL_ADDRESS = "cassandra.output.address";
+ private static final String OUTPUT_INITIAL_PORT = "cassandra.output.port";
private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
private static final String OUTPUT_LOCAL_DC_ONLY = "cassandra.output.local.dc.only";
+ private static final String DEFAULT_CASSANDRA_NATIVE_PORT = "7000";
private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
@@ -349,6 +351,16 @@ public class ConfigHelper
return conf.get(OUTPUT_INITIAL_ADDRESS);
}
+ public static void setOutputInitialPort(Configuration conf, Integer port)
+ {
+ conf.set(OUTPUT_INITIAL_PORT, port.toString());
+ }
+
+ public static Integer getOutputInitialPort(Configuration conf)
+ {
+ return Integer.valueOf(conf.get(OUTPUT_INITIAL_PORT, DEFAULT_CASSANDRA_NATIVE_PORT));
+ }
+
public static void setOutputInitialAddress(Configuration conf, String address)
{
conf.set(OUTPUT_INITIAL_ADDRESS, address);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 0f44e0c..204d9ee 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -21,11 +21,13 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
+import com.google.common.net.HostAndPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,7 @@ import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.NativeSSTableLoaderClient;
@@ -80,7 +83,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
protected SSTableLoader loader;
protected Progressable progress;
protected TaskAttemptContext context;
- protected final Set<InetAddress> ignores = new HashSet<>();
+ protected final Set<InetAddressAndPort> ignores = new HashSet<>();
private String keyspace;
private String table;
@@ -139,7 +142,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
try
{
for (String hostToIgnore : CqlBulkOutputFormat.getIgnoreHosts(conf))
- ignores.add(InetAddress.getByName(hostToIgnore));
+ ignores.add(InetAddressAndPort.getByName(hostToIgnore));
}
catch (UnknownHostException e)
{
@@ -285,20 +288,23 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
{
super(resolveHostAddresses(conf),
CqlConfigHelper.getOutputNativePort(conf),
+ ConfigHelper.getOutputInitialPort(conf),
ConfigHelper.getOutputKeyspaceUserName(conf),
ConfigHelper.getOutputKeyspacePassword(conf),
- CqlConfigHelper.getSSLOptions(conf).orNull());
+ CqlConfigHelper.getSSLOptions(conf).orNull(),
+ CqlConfigHelper.getAllowServerPortDiscovery(conf));
}
- private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+ private static Collection<InetSocketAddress> resolveHostAddresses(Configuration conf)
{
- Set<InetAddress> addresses = new HashSet<>();
-
+ Set<InetSocketAddress> addresses = new HashSet<>();
+ int port = CqlConfigHelper.getOutputNativePort(conf);
for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
{
try
{
- addresses.add(InetAddress.getByName(host));
+ HostAndPort hap = HostAndPort.fromString(host);
+ addresses.add(new InetSocketAddress(InetAddress.getByName(hap.getHost()), hap.getPortOrDefault(port)));
}
catch (UnknownHostException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index f9a6f3a..3a47a72 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -88,6 +88,7 @@ public class CqlConfigHelper
private static final String OUTPUT_CQL = "cassandra.output.cql";
private static final String OUTPUT_NATIVE_PORT = "cassandra.output.native.port";
+ private static final String ALLOW_SERVER_PORT_DISCOVERY = "cassandra.allowserverportdiscovery";
/**
* Set the CQL columns for the input of this job.
@@ -651,4 +652,15 @@ public class CqlConfigHelper
new SecureRandom());
return ctx;
}
+
+ public static void setAllowServerPortDiscovery(Configuration conf, boolean allowServerPortDiscovery)
+ {
+ conf.set(ALLOW_SERVER_PORT_DISCOVERY, Boolean.toString(allowServerPortDiscovery));
+ }
+
+ public static boolean getAllowServerPortDiscovery(Configuration conf)
+ {
+ return Boolean.parseBoolean(conf.get(ALLOW_SERVER_PORT_DISCOVERY, "false"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index 2b92a42..cec6f0b 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -18,13 +18,13 @@
*/
package org.apache.cassandra.hints;
-import java.net.InetAddress;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -47,7 +47,7 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
{
UUID hostId = message.payload.hostId;
Hint hint = message.payload.hint;
- InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
+ InetAddressAndPort address = StorageService.instance.getEndpointForHostId(hostId);
// If we see an unknown table id, it means the table, or one of the tables in the mutation, had been dropped.
// In that case there is nothing we can really do, or should do, other than log it go on.
@@ -96,7 +96,7 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
}
}
- private static void reply(int id, InetAddress to)
+ private static void reply(int id, InetAddressAndPort to)
{
MessagingService.instance().sendReply(HintResponse.message, id, to);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 58a3e6f..cbbb212 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.hints;
import java.io.File;
-import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.StorageService;
/**
@@ -50,10 +50,10 @@ final class HintsDispatchExecutor
private final File hintsDirectory;
private final ExecutorService executor;
private final AtomicBoolean isPaused;
- private final Predicate<InetAddress> isAlive;
+ private final Predicate<InetAddressAndPort> isAlive;
private final Map<UUID, Future> scheduledDispatches;
- HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Predicate<InetAddress> isAlive)
+ HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Predicate<InetAddressAndPort> isAlive)
{
this.hintsDirectory = hintsDirectory;
this.isPaused = isPaused;
@@ -154,7 +154,7 @@ final class HintsDispatchExecutor
public void run()
{
UUID hostId = hostIdSupplier.get();
- InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
+ InetAddressAndPort address = StorageService.instance.getEndpointForHostId(hostId);
logger.info("Transferring all hints to {}: {}", address, hostId);
if (transfer(hostId))
return;
@@ -257,7 +257,7 @@ final class HintsDispatchExecutor
{
logger.trace("Dispatching hints file {}", descriptor.fileName());
- InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
+ InetAddressAndPort address = StorageService.instance.getEndpointForHostId(hostId);
if (address != null)
return deliver(descriptor, address);
@@ -266,7 +266,7 @@ final class HintsDispatchExecutor
return true;
}
- private boolean deliver(HintsDescriptor descriptor, InetAddress address)
+ private boolean deliver(HintsDescriptor descriptor, InetAddressAndPort address)
{
File file = new File(hintsDirectory, descriptor.fileName());
InputPosition offset = store.getDispatchOffset(descriptor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 34d1eb2..ca38c0c 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.schema.Schema;
-import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
+import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
/**
* A simple dispatch trigger that's being run every 10 seconds.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 323eeb1..d0d9aac 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.hints;
import java.io.File;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
@@ -51,13 +51,13 @@ final class HintsDispatcher implements AutoCloseable
private final HintsReader reader;
private final UUID hostId;
- private final InetAddress address;
+ private final InetAddressAndPort address;
private final int messagingVersion;
private final BooleanSupplier abortRequested;
private InputPosition currentPagePosition;
- private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, BooleanSupplier abortRequested)
+ private HintsDispatcher(HintsReader reader, UUID hostId, InetAddressAndPort address, int messagingVersion, BooleanSupplier abortRequested)
{
currentPagePosition = null;
@@ -68,7 +68,7 @@ final class HintsDispatcher implements AutoCloseable
this.abortRequested = abortRequested;
}
- static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, BooleanSupplier abortRequested)
+ static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddressAndPort address, UUID hostId, BooleanSupplier abortRequested)
{
int messagingVersion = MessagingService.instance().getVersion(address);
return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested);
@@ -228,7 +228,7 @@ final class HintsDispatcher implements AutoCloseable
return timedOut ? Outcome.TIMEOUT : outcome;
}
- public void onFailure(InetAddress from, RequestFailureReason failureReason)
+ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
{
outcome = Outcome.FAILURE;
condition.signalAll();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 3d82c02..5c331d0 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.hints;
import java.io.File;
import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.UUID;
@@ -40,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.dht.Token;
@@ -267,10 +267,10 @@ public final class HintsService implements HintsServiceMBean
*/
public void deleteAllHintsForEndpoint(String address)
{
- InetAddress target;
+ InetAddressAndPort target;
try
{
- target = InetAddress.getByName(address);
+ target = InetAddressAndPort.getByName(address);
}
catch (UnknownHostException e)
{
@@ -284,7 +284,7 @@ public final class HintsService implements HintsServiceMBean
*
* @param target inet address of the target node
*/
- public void deleteAllHintsForEndpoint(InetAddress target)
+ public void deleteAllHintsForEndpoint(InetAddressAndPort target)
{
UUID hostId = StorageService.instance.getHostIdForEndpoint(target);
if (hostId == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index 3572172..bbf57f5 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.hints;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.SyncUtil;
@@ -77,14 +77,14 @@ final class HintsStore
return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
}
- InetAddress address()
+ InetAddressAndPort address()
{
return StorageService.instance.getEndpointForHostId(hostId);
}
boolean isLive()
{
- InetAddress address = address();
+ InetAddressAndPort address = address();
return address != null && FailureDetector.instance.isAlive(address);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java b/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java
new file mode 100644
index 0000000..d82ff7d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Serializes a dummy byte that can't be set. Will always write 0 and return 0 in a correctly formed message.
+ */
+public class DummyByteVersionedSerializer implements IVersionedSerializer<byte[]>
+{
+ public static final DummyByteVersionedSerializer instance = new DummyByteVersionedSerializer();
+
+ private DummyByteVersionedSerializer() {}
+
+ public void serialize(byte[] bytes, DataOutputPlus out, int version) throws IOException
+ {
+ Preconditions.checkArgument(bytes == MessagingService.ONE_BYTE);
+ out.write(0);
+ }
+
+ public byte[] deserialize(DataInputPlus in, int version) throws IOException
+ {
+ assert(0 == in.readByte());
+ return MessagingService.ONE_BYTE;
+ }
+
+ public long serializedSize(byte[] bytes, int version)
+ {
+ //Payload
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java b/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java
new file mode 100644
index 0000000..8731f4c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class ShortVersionedSerializer implements IVersionedSerializer<Short>
+{
+
+ public static final ShortVersionedSerializer instance = new ShortVersionedSerializer();
+
+ private ShortVersionedSerializer() {}
+
+ public void serialize(Short aShort, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeShort(aShort);
+ }
+
+ public Short deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return in.readShort();
+ }
+
+ public long serializedSize(Short aShort, int version)
+ {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 9fb3059..7d77ad5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.db.Directories;
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Pair;
@@ -50,10 +49,10 @@ public class SSTableLoader implements StreamEventHandler
private final Client client;
private final int connectionsPerHost;
private final OutputHandler outputHandler;
- private final Set<InetAddress> failedHosts = new HashSet<>();
+ private final Set<InetAddressAndPort> failedHosts = new HashSet<>();
private final List<SSTableReader> sstables = new ArrayList<>();
- private final Multimap<InetAddress, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create();
+ private final Multimap<InetAddressAndPort, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create();
public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
{
@@ -70,7 +69,7 @@ public class SSTableLoader implements StreamEventHandler
}
@SuppressWarnings("resource")
- protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges)
+ protected Collection<SSTableReader> openSSTables(final Map<InetAddressAndPort, Collection<Range<Token>>> ranges)
{
outputHandler.output("Opening sstables and calculating sections to stream");
@@ -124,9 +123,9 @@ public class SSTableLoader implements StreamEventHandler
// calculate the sstable sections to stream as well as the estimated number of
// keys per host
- for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : ranges.entrySet())
+ for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : ranges.entrySet())
{
- InetAddress endpoint = entry.getKey();
+ InetAddressAndPort endpoint = entry.getKey();
Collection<Range<Token>> tokenRanges = entry.getValue();
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
@@ -153,17 +152,17 @@ public class SSTableLoader implements StreamEventHandler
public StreamResultFuture stream()
{
- return stream(Collections.<InetAddress>emptySet());
+ return stream(Collections.<InetAddressAndPort>emptySet());
}
- public StreamResultFuture stream(Set<InetAddress> toIgnore, StreamEventHandler... listeners)
+ public StreamResultFuture stream(Set<InetAddressAndPort> toIgnore, StreamEventHandler... listeners)
{
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory());
- Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
+ Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
if (sstables.isEmpty())
{
@@ -173,9 +172,9 @@ public class SSTableLoader implements StreamEventHandler
outputHandler.output(String.format("Streaming relevant part of %s to %s", names(sstables), endpointToRanges.keySet()));
- for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : endpointToRanges.entrySet())
+ for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : endpointToRanges.entrySet())
{
- InetAddress remote = entry.getKey();
+ InetAddressAndPort remote = entry.getKey();
if (toIgnore.contains(remote))
continue;
@@ -232,14 +231,14 @@ public class SSTableLoader implements StreamEventHandler
return builder.toString();
}
- public Set<InetAddress> getFailedHosts()
+ public Set<InetAddressAndPort> getFailedHosts()
{
return failedHosts;
}
public static abstract class Client
{
- private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
+ private final Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
/**
* Initialize the client.
@@ -281,12 +280,12 @@ public class SSTableLoader implements StreamEventHandler
throw new RuntimeException();
}
- public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap()
+ public Map<InetAddressAndPort, Collection<Range<Token>>> getEndpointToRangesMap()
{
return endpointToRanges;
}
- protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint)
+ protected void addRangeForEndpoint(Range<Token> range, InetAddressAndPort endpoint)
{
Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);
if (ranges == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
index 546d15e..2ee8eea 100644
--- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
@@ -17,14 +17,13 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
public abstract class AbstractEndpointSnitch implements IEndpointSnitch
{
- public abstract int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+ public abstract int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2);
/**
* Sorts the <tt>Collection</tt> of node addresses by proximity to the given address
@@ -32,9 +31,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
* @param unsortedAddress the nodes to sort
* @return a new sorted <tt>List</tt>
*/
- public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress)
+ public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress)
{
- List<InetAddress> preferred = new ArrayList<InetAddress>(unsortedAddress);
+ List<InetAddressAndPort> preferred = new ArrayList<>(unsortedAddress);
sortByProximity(address, preferred);
return preferred;
}
@@ -44,11 +43,11 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
* @param address the address to sort the proximity by
* @param addresses the nodes to sort
*/
- public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
+ public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
{
- Collections.sort(addresses, new Comparator<InetAddress>()
+ Collections.sort(addresses, new Comparator<InetAddressAndPort>()
{
- public int compare(InetAddress a1, InetAddress a2)
+ public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
{
return compareEndpoints(address, a1, a2);
}
@@ -60,7 +59,7 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
// noop by default
}
- public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+ public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
{
// Querying remote DC is likely to be an order of magnitude slower than
// querying locally, so 2 queries to local nodes is likely to still be
@@ -71,10 +70,10 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
: true;
}
- private boolean hasRemoteNode(List<InetAddress> l)
+ private boolean hasRemoteNode(List<InetAddressAndPort> l)
{
String localDc = DatabaseDescriptor.getLocalDataCenter();
- for (InetAddress ep : l)
+ for (InetAddressAndPort ep : l)
{
if (!localDc.equals(getDatacenter(ep)))
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
index b5606d6..e91f6ac 100644
--- a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
-
/**
* An endpoint snitch tells Cassandra information about network topology that it can use to route
* requests more efficiently.
@@ -30,16 +28,16 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit
* @param endpoint a specified endpoint
* @return string of rack
*/
- abstract public String getRack(InetAddress endpoint);
+ abstract public String getRack(InetAddressAndPort endpoint);
/**
* Return the data center for which an endpoint resides in
* @param endpoint a specified endpoint
* @return string of data center
*/
- abstract public String getDatacenter(InetAddress endpoint);
+ abstract public String getDatacenter(InetAddressAndPort endpoint);
- public int compareEndpoints(InetAddress address, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort address, InetAddressAndPort a1, InetAddressAndPort a2)
{
if (address.equals(a1) && !address.equals(a2))
return -1;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index c3498d9..3e9b5bb 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.locator;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.net.InetAddress;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
@@ -74,9 +73,9 @@ public abstract class AbstractReplicationStrategy
// lazy-initialize keyspace itself since we don't create them until after the replication strategies
}
- private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
+ private final Map<Token, ArrayList<InetAddressAndPort>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddressAndPort>>();
- public ArrayList<InetAddress> getCachedEndpoints(Token t)
+ public ArrayList<InetAddressAndPort> getCachedEndpoints(Token t)
{
long lastVersion = tokenMetadata.getRingVersion();
@@ -103,21 +102,21 @@ public abstract class AbstractReplicationStrategy
* @param searchPosition the position the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
- public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition)
+ public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition)
{
Token searchToken = searchPosition.getToken();
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
- ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
+ ArrayList<InetAddressAndPort> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null)
{
TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
// if our cache got invalidated, it's possible there is a new token to account for too
keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
- endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
+ endpoints = new ArrayList<InetAddressAndPort>(calculateNaturalEndpoints(searchToken, tm));
cachedEndpoints.put(keyToken, endpoints);
}
- return new ArrayList<InetAddress>(endpoints);
+ return new ArrayList<InetAddressAndPort>(endpoints);
}
/**
@@ -128,10 +127,10 @@ public abstract class AbstractReplicationStrategy
* @param searchToken the token the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
- public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
+ public abstract List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
- Collection<InetAddress> pendingEndpoints,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
+ Collection<InetAddressAndPort> pendingEndpoints,
ConsistencyLevel consistency_level,
Runnable callback,
WriteType writeType,
@@ -140,8 +139,8 @@ public abstract class AbstractReplicationStrategy
return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
}
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
- Collection<InetAddress> pendingEndpoints,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
+ Collection<InetAddressAndPort> pendingEndpoints,
ConsistencyLevel consistency_level,
Runnable callback,
WriteType writeType,
@@ -211,14 +210,14 @@ public abstract class AbstractReplicationStrategy
* (fixing this would probably require merging tokenmetadata into replicationstrategy,
* so we could cache/invalidate cleanly.)
*/
- public Multimap<InetAddress, Range<Token>> getAddressRanges(TokenMetadata metadata)
+ public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges(TokenMetadata metadata)
{
- Multimap<InetAddress, Range<Token>> map = HashMultimap.create();
+ Multimap<InetAddressAndPort, Range<Token>> map = HashMultimap.create();
for (Token token : metadata.sortedTokens())
{
Range<Token> range = metadata.getPrimaryRangeFor(token);
- for (InetAddress ep : calculateNaturalEndpoints(token, metadata))
+ for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata))
{
map.put(ep, range);
}
@@ -227,14 +226,14 @@ public abstract class AbstractReplicationStrategy
return map;
}
- public Multimap<Range<Token>, InetAddress> getRangeAddresses(TokenMetadata metadata)
+ public Multimap<Range<Token>, InetAddressAndPort> getRangeAddresses(TokenMetadata metadata)
{
- Multimap<Range<Token>, InetAddress> map = HashMultimap.create();
+ Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
for (Token token : metadata.sortedTokens())
{
Range<Token> range = metadata.getPrimaryRangeFor(token);
- for (InetAddress ep : calculateNaturalEndpoints(token, metadata))
+ for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata))
{
map.put(range, ep);
}
@@ -243,17 +242,17 @@ public abstract class AbstractReplicationStrategy
return map;
}
- public Multimap<InetAddress, Range<Token>> getAddressRanges()
+ public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges()
{
return getAddressRanges(tokenMetadata.cloneOnlyTokenMap());
}
- public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
+ public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress)
{
return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress);
}
- public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddress pendingAddress)
+ public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress)
{
TokenMetadata temp = metadata.cloneOnlyTokenMap();
temp.updateNormalTokens(pendingTokens, pendingAddress);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
index ec2e87e..be6d3c4 100644
--- a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
+++ b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
@@ -24,7 +24,6 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.net.HttpURLConnection;
-import java.net.InetAddress;
import java.net.URL;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -56,7 +55,7 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
protected static final Logger logger = LoggerFactory.getLogger(CloudstackSnitch.class);
protected static final String ZONE_NAME_QUERY_URI = "/latest/meta-data/availability-zone";
- private Map<InetAddress, Map<String, String>> savedEndpoints;
+ private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
private static final String DEFAULT_DC = "UNKNOWN-DC";
private static final String DEFAULT_RACK = "UNKNOWN-RACK";
@@ -83,9 +82,9 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
csZoneRack = zone_parts[2];
}
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return csZoneRack;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
@@ -99,9 +98,9 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
return state.getApplicationState(ApplicationState.RACK).value;
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return csZoneDc;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 42fc26c..b9c9ba0 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import javax.management.MBeanServer;
@@ -63,8 +64,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
private String mbeanName;
private boolean registered = false;
- private volatile HashMap<InetAddress, Double> scores = new HashMap<>();
- private final ConcurrentHashMap<InetAddress, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>();
+ private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
+ private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>();
public final IEndpointSnitch subsnitch;
@@ -174,27 +175,27 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
subsnitch.gossiperStarting();
}
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return subsnitch.getRack(endpoint);
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return subsnitch.getDatacenter(endpoint);
}
- public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
+ public List<InetAddressAndPort> getSortedListByProximity(final InetAddressAndPort address, Collection<InetAddressAndPort> addresses)
{
- List<InetAddress> list = new ArrayList<InetAddress>(addresses);
+ List<InetAddressAndPort> list = new ArrayList<>(addresses);
sortByProximity(address, list);
return list;
}
@Override
- public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
+ public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
{
- assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself
+ assert address.equals(FBUtilities.getBroadcastAddressAndPort()); // we only know about ourself
if (dynamicBadnessThreshold == 0)
{
sortByProximityWithScore(address, addresses);
@@ -205,32 +206,32 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
}
- private void sortByProximityWithScore(final InetAddress address, List<InetAddress> addresses)
+ private void sortByProximityWithScore(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
{
// Scores can change concurrently from a call to this method. But Collections.sort() expects
// its comparator to be "stable", that is 2 endpoint should compare the same way for the duration
// of the sort() call. As we copy the scores map on write, it is thus enough to alias the current
// version of it during this call.
- final HashMap<InetAddress, Double> scores = this.scores;
- Collections.sort(addresses, new Comparator<InetAddress>()
+ final HashMap<InetAddressAndPort, Double> scores = this.scores;
+ Collections.sort(addresses, new Comparator<InetAddressAndPort>()
{
- public int compare(InetAddress a1, InetAddress a2)
+ public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
{
return compareEndpoints(address, a1, a2, scores);
}
});
}
- private void sortByProximityWithBadness(final InetAddress address, List<InetAddress> addresses)
+ private void sortByProximityWithBadness(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
{
if (addresses.size() < 2)
return;
subsnitch.sortByProximity(address, addresses);
- HashMap<InetAddress, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below
+ HashMap<InetAddressAndPort, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below
// (which wouldn't really matter here but its cleaner that way).
ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(addresses.size());
- for (InetAddress inet : addresses)
+ for (InetAddressAndPort inet : addresses)
{
Double score = scores.get(inet);
if (score == null)
@@ -256,7 +257,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
// Compare endpoints given an immutable snapshot of the scores
- private int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2, Map<InetAddress, Double> scores)
+ private int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2, Map<InetAddressAndPort, Double> scores)
{
Double scored1 = scores.get(a1);
Double scored2 = scores.get(a2);
@@ -279,7 +280,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return 1;
}
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
{
// That function is fundamentally unsafe because the scores can change at any time and so the result of that
// method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in
@@ -287,7 +288,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
throw new UnsupportedOperationException("You shouldn't wrap the DynamicEndpointSnitch (within itself or otherwise)");
}
- public void receiveTiming(InetAddress host, long latency) // this is cheap
+ public void receiveTiming(InetAddressAndPort host, long latency) // this is cheap
{
ExponentiallyDecayingReservoir sample = samples.get(host);
if (sample == null)
@@ -315,23 +316,23 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
double maxLatency = 1;
- Map<InetAddress, Snapshot> snapshots = new HashMap<>(samples.size());
- for (Map.Entry<InetAddress, ExponentiallyDecayingReservoir> entry : samples.entrySet())
+ Map<InetAddressAndPort, Snapshot> snapshots = new HashMap<>(samples.size());
+ for (Map.Entry<InetAddressAndPort, ExponentiallyDecayingReservoir> entry : samples.entrySet())
{
snapshots.put(entry.getKey(), entry.getValue().getSnapshot());
}
// We're going to weight the latency for each host against the worst one we see, to
// arrive at sort of a 'badness percentage' for them. First, find the worst for each:
- HashMap<InetAddress, Double> newScores = new HashMap<>();
- for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet())
+ HashMap<InetAddressAndPort, Double> newScores = new HashMap<>();
+ for (Map.Entry<InetAddressAndPort, Snapshot> entry : snapshots.entrySet())
{
double mean = entry.getValue().getMedian();
if (mean > maxLatency)
maxLatency = mean;
}
// now make another pass to do the weighting based on the maximums we found before
- for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet())
+ for (Map.Entry<InetAddressAndPort, Snapshot> entry : snapshots.entrySet())
{
double score = entry.getValue().getMedian() / maxLatency;
// finally, add the severity without any weighting, since hosts scale this relative to their own load and the size of the task causing the severity.
@@ -351,6 +352,11 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public Map<InetAddress, Double> getScores()
{
+ return scores.entrySet().stream().collect(Collectors.toMap(address -> address.getKey().address, Map.Entry::getValue));
+ }
+
+ public Map<InetAddressAndPort, Double> getScoresWithPort()
+ {
return scores;
}
@@ -374,7 +380,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public List<Double> dumpTimings(String hostname) throws UnknownHostException
{
- InetAddress host = InetAddress.getByName(hostname);
+ InetAddressAndPort host = InetAddressAndPort.getByName(hostname);
ArrayList<Double> timings = new ArrayList<Double>();
ExponentiallyDecayingReservoir sample = samples.get(host);
if (sample != null)
@@ -390,7 +396,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, StorageService.instance.valueFactory.severity(severity));
}
- private double getSeverity(InetAddress endpoint)
+ private double getSeverity(InetAddressAndPort endpoint)
{
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null)
@@ -405,10 +411,10 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public double getSeverity()
{
- return getSeverity(FBUtilities.getBroadcastAddress());
+ return getSeverity(FBUtilities.getBroadcastAddressAndPort());
}
- public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+ public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
{
if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2))
return false;
@@ -428,10 +434,10 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
// Return the max score for the endpoint in the provided list, or -1.0 if no node have a score.
- private double maxScore(List<InetAddress> endpoints)
+ private double maxScore(List<InetAddressAndPort> endpoints)
{
double maxScore = -1.0;
- for (InetAddress endpoint : endpoints)
+ for (InetAddressAndPort endpoint : endpoints)
{
Double score = scores.get(endpoint);
if (score == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
index bfafa75..61f0d97 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
@@ -24,6 +24,8 @@ import java.util.List;
public interface DynamicEndpointSnitchMBean
{
+ public Map<InetAddressAndPort, Double> getScoresWithPort();
+ @Deprecated
public Map<InetAddress, Double> getScores();
public int getUpdateInterval();
public int getResetInterval();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
index b32ca84..2a6c7e9 100644
--- a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -62,6 +63,16 @@ public class Ec2MultiRegionSnitch extends Ec2Snitch
public void gossiperStarting()
{
super.gossiperStarting();
+ InetAddressAndPort address;
+ try
+ {
+ address = InetAddressAndPort.getByName(localPrivateAddress);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT, StorageService.instance.valueFactory.internalAddressAndPort(address));
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(localPrivateAddress));
Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/Ec2Snitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
index 59eb27b..c7324c8 100644
--- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
-import java.net.InetAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -46,7 +45,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone";
private static final String DEFAULT_DC = "UNKNOWN-DC";
private static final String DEFAULT_RACK = "UNKNOWN-RACK";
- private Map<InetAddress, Map<String, String>> savedEndpoints;
+ private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
protected String ec2zone;
protected String ec2region;
@@ -92,9 +91,9 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
}
}
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return ec2zone;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
@@ -108,9 +107,9 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
return state.getApplicationState(ApplicationState.RACK).value;
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return ec2region;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index bbfabb6..c06d765 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.locator;
import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -44,22 +43,22 @@ public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
public String getDatacenter(String host) throws UnknownHostException
{
- return DatabaseDescriptor.getEndpointSnitch().getDatacenter(InetAddress.getByName(host));
+ return DatabaseDescriptor.getEndpointSnitch().getDatacenter(InetAddressAndPort.getByName(host));
}
public String getRack(String host) throws UnknownHostException
{
- return DatabaseDescriptor.getEndpointSnitch().getRack(InetAddress.getByName(host));
+ return DatabaseDescriptor.getEndpointSnitch().getRack(InetAddressAndPort.getByName(host));
}
public String getDatacenter()
{
- return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+ return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
}
public String getRack()
{
- return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
+ return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
}
public String getSnitchName()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
index b4d3b19..1e1c500 100644
--- a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
-import java.net.InetAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -46,7 +45,7 @@ public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch
protected static final String ZONE_NAME_QUERY_URL = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
private static final String DEFAULT_DC = "UNKNOWN-DC";
private static final String DEFAULT_RACK = "UNKNOWN-RACK";
- private Map<InetAddress, Map<String, String>> savedEndpoints;
+ private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
protected String gceZone;
protected String gceRegion;
@@ -94,9 +93,9 @@ public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch
}
}
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return gceZone;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
@@ -110,9 +109,9 @@ public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch
return state.getApplicationState(ApplicationState.RACK).value;
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return gceRegion;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
index e2449ae..75b5685 100644
--- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Map;
@@ -45,7 +44,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
private final boolean preferLocal;
private final AtomicReference<ReconnectableSnitchHelper> snitchHelperReference;
- private Map<InetAddress, Map<String, String>> savedEndpoints;
+ private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
private static final String DEFAULT_DC = "UNKNOWN_DC";
private static final String DEFAULT_RACK = "UNKNOWN_RACK";
@@ -84,9 +83,9 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
* @param endpoint the endpoint to process
* @return string of data center
*/
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return myDC;
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
@@ -112,9 +111,9 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
* @param endpoint the endpoint to process
* @return string of rack
*/
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return myRack;
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
@@ -138,8 +137,10 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
{
super.gossiperStarting();
+ Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT,
+ StorageService.instance.valueFactory.internalAddressAndPort(FBUtilities.getLocalAddressAndPort()));
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
- StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
+ StorageService.instance.valueFactory.internalIP(FBUtilities.getJustLocalAddress().getHostAddress()));
loadGossiperState();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index 71b441c..00a1543 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.List;
@@ -32,27 +31,27 @@ public interface IEndpointSnitch
/**
* returns a String representing the rack this endpoint belongs to
*/
- public String getRack(InetAddress endpoint);
+ public String getRack(InetAddressAndPort endpoint);
/**
* returns a String representing the datacenter this endpoint belongs to
*/
- public String getDatacenter(InetAddress endpoint);
+ public String getDatacenter(InetAddressAndPort endpoint);
/**
* returns a new <tt>List</tt> sorted by proximity to the given endpoint
*/
- public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
+ public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress);
/**
* This method will sort the <tt>List</tt> by proximity to the given address.
*/
- public void sortByProximity(InetAddress address, List<InetAddress> addresses);
+ public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses);
/**
* compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
*/
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2);
/**
* called after Gossiper instance exists immediately before it starts gossiping
@@ -63,5 +62,5 @@ public interface IEndpointSnitch
* Returns whether for a range query doing a query against merged is likely
* to be faster than 2 sequential queries, one against l1 followed by one against l2.
*/
- public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2);
+ public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
index d2ae6db..f6c1c7f 100644
--- a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
+++ b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
@@ -17,9 +17,7 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
-
public interface ILatencySubscriber
{
- public void receiveTiming(InetAddress address, long latency);
+ public void receiveTiming(InetAddressAndPort address, long latency);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
new file mode 100644
index 0000000..38a1a49
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+
+import org.apache.cassandra.utils.FastByteOperations;
+
+/**
+ * A class to replace the usage of InetAddress to identify hosts in the cluster.
+ * Opting for a full replacement class so that in the future if we change the nature
+ * of the identifier the refactor will be easier in that we don't have to change the type
+ * just the methods.
+ *
+ * Because an IP might contain multiple C* instances the identification must be done
+ * using the IP + port. InetSocketAddress is undesirable for a couple of reasons. It's not comparable,
+ * it's toString() method doesn't correctly bracket IPv6, it doesn't handle optional default values,
+ * and a couple of other minor behaviors that are slightly less troublesome like handling the
+ * need to sometimes return a port and sometimes not.
+ *
+ */
+public final class InetAddressAndPort implements Comparable<InetAddressAndPort>, Serializable
+{
+ private static final long serialVersionUID = 0;
+
+ //Store these here to avoid requiring DatabaseDescriptor to be loaded. DatabaseDescriptor will set
+ //these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor.
+ //Tools that might use this class also might not load database descriptor. Those tools are expected
+ //to always override the defaults.
+ static volatile int defaultPort = 7000;
+
+ public final InetAddress address;
+ public final byte[] addressBytes;
+ public final int port;
+
+ private InetAddressAndPort(InetAddress address, byte[] addressBytes, int port)
+ {
+ Preconditions.checkNotNull(address);
+ Preconditions.checkNotNull(addressBytes);
+ validatePortRange(port);
+ this.address = address;
+ this.port = port;
+ this.addressBytes = addressBytes;
+ }
+
+ private static void validatePortRange(int port)
+ {
+ if (port < 0 | port > 65535)
+ {
+ throw new IllegalArgumentException("Port " + port + " is not a valid port number in the range 0-65535");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ InetAddressAndPort that = (InetAddressAndPort) o;
+
+ if (port != that.port) return false;
+ return address.equals(that.address);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = address.hashCode();
+ result = 31 * result + port;
+ return result;
+ }
+
+ @Override
+ public int compareTo(InetAddressAndPort o)
+ {
+ int retval = FastByteOperations.compareUnsigned(addressBytes, 0, addressBytes.length, o.addressBytes, 0, o.addressBytes.length);
+ if (retval != 0)
+ {
+ return retval;
+ }
+
+ return Integer.compare(port, o.port);
+ }
+
+ public String getHostAddress(boolean withPort)
+ {
+ if (withPort)
+ {
+ return toString();
+ }
+ else
+ {
+ return address.getHostAddress();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return toString(true);
+ }
+
+ public String toString(boolean withPort)
+ {
+ if (withPort)
+ {
+ return HostAndPort.fromParts(address.getHostAddress(), port).toString();
+ }
+ else
+ {
+ return address.toString();
+ }
+ }
+
+ public static InetAddressAndPort getByName(String name) throws UnknownHostException
+ {
+ return getByNameOverrideDefaults(name, null);
+ }
+
+ /**
+ *
+ * @param name Hostname + optional ports string
+ * @param port Port to connect on, overridden by values in hostname string, defaults to DatabaseDescriptor default if not specified anywhere.
+ * @return
+ * @throws UnknownHostException
+ */
+ public static InetAddressAndPort getByNameOverrideDefaults(String name, Integer port) throws UnknownHostException
+ {
+ HostAndPort hap = HostAndPort.fromString(name);
+ if (hap.hasPort())
+ {
+ port = hap.getPort();
+ }
+ return getByAddressOverrideDefaults(InetAddress.getByName(hap.getHost()), port);
+ }
+
+ public static InetAddressAndPort getByAddress(byte[] address) throws UnknownHostException
+ {
+ return getByAddressOverrideDefaults(InetAddress.getByAddress(address), address, null);
+ }
+
+ public static InetAddressAndPort getByAddress(InetAddress address)
+ {
+ return getByAddressOverrideDefaults(address, null);
+ }
+
+ public static InetAddressAndPort getByAddressOverrideDefaults(InetAddress address, Integer port)
+ {
+ if (port == null)
+ {
+ port = defaultPort;
+ }
+
+ return new InetAddressAndPort(address, address.getAddress(), port);
+ }
+
+ public static InetAddressAndPort getByAddressOverrideDefaults(InetAddress address, byte[] addressBytes, Integer port)
+ {
+ if (port == null)
+ {
+ port = defaultPort;
+ }
+
+ return new InetAddressAndPort(address, addressBytes, port);
+ }
+
+ public static InetAddressAndPort getLoopbackAddress()
+ {
+ return InetAddressAndPort.getByAddress(InetAddress.getLoopbackAddress());
+ }
+
+ public static InetAddressAndPort getLocalHost() throws UnknownHostException
+ {
+ return InetAddressAndPort.getByAddress(InetAddress.getLocalHost());
+ }
+
+ public static void initializeDefaultPort(int port)
+ {
+ defaultPort = port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/LocalStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index ae58203..a76fe96 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
@@ -42,16 +41,16 @@ public class LocalStrategy extends AbstractReplicationStrategy
* LocalStrategy may be used before tokens are set up.
*/
@Override
- public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition)
+ public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition)
{
- ArrayList<InetAddress> l = new ArrayList<InetAddress>(1);
- l.add(FBUtilities.getBroadcastAddress());
+ ArrayList<InetAddressAndPort> l = new ArrayList<InetAddressAndPort>(1);
+ l.add(FBUtilities.getBroadcastAddressAndPort());
return l;
}
- public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
{
- return Collections.singletonList(FBUtilities.getBroadcastAddress());
+ return Collections.singletonList(FBUtilities.getBroadcastAddressAndPort());
}
public int getReplicationFactor()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 442e6cf..673c018 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.*;
import java.util.Map.Entry;
@@ -72,7 +71,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
}
datacenters = Collections.unmodifiableMap(newDatacenters);
- logger.trace("Configured datacenter replicas are {}", FBUtilities.toString(datacenters));
+ logger.info("Configured datacenter replicas are {}", FBUtilities.toString(datacenters));
}
/**
@@ -81,7 +80,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
private static final class DatacenterEndpoints
{
/** List accepted endpoints get pushed into. */
- Set<InetAddress> endpoints;
+ Set<InetAddressAndPort> endpoints;
/**
* Racks encountered so far. Replicas are put into separate racks while possible.
* For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure
@@ -93,7 +92,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
int rfLeft;
int acceptableRackRepeats;
- DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddress> endpoints, Set<Pair<String, String>> racks)
+ DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddressAndPort> endpoints, Set<Pair<String, String>> racks)
{
this.endpoints = endpoints;
this.racks = racks;
@@ -108,7 +107,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
* Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful.
* Returns true if the endpoint was added, and this datacenter does not require further replicas.
*/
- boolean addEndpointAndCheckIfDone(InetAddress ep, Pair<String,String> location)
+ boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location)
{
if (done())
return false;
@@ -143,17 +142,17 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
/**
* calculate endpoints in one pass through the tokens by tracking our progress in each DC.
*/
- public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
+ public List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
{
// we want to preserve insertion order so that the first added endpoint becomes primary
- Set<InetAddress> replicas = new LinkedHashSet<>();
+ Set<InetAddressAndPort> replicas = new LinkedHashSet<>();
Set<Pair<String, String>> seenRacks = new HashSet<>();
Topology topology = tokenMetadata.getTopology();
// all endpoints in each DC, so we can check when we have exhausted all the members of a DC
- Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints();
// all racks in a DC so we can check when we have exhausted all racks in a DC
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks();
assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
int dcsToFill = 0;
@@ -178,7 +177,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
while (dcsToFill > 0 && tokenIter.hasNext())
{
Token next = tokenIter.next();
- InetAddress ep = tokenMetadata.getEndpoint(next);
+ InetAddressAndPort ep = tokenMetadata.getEndpoint(next);
Pair<String, String> location = topology.getLocation(ep);
DatacenterEndpoints dcEndpoints = dcs.get(location.left);
if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location))
@@ -227,9 +226,9 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
// Add data center of localhost.
- validDataCenters.add(snitch.getDatacenter(FBUtilities.getBroadcastAddress()));
+ validDataCenters.add(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()));
// Fetch and add DCs of all peers.
- for (final InetAddress peer : StorageService.instance.getTokenMetadata().getAllEndpoints())
+ for (final InetAddressAndPort peer : StorageService.instance.getTokenMetadata().getAllEndpoints())
{
validDataCenters.add(snitch.getDatacenter(peer));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
index b9bd767..93e629e 100644
--- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
@@ -42,10 +41,10 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
{
int replicas = getReplicationFactor();
- List<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+ List<InetAddressAndPort> endpoints = new ArrayList<>(replicas);
ArrayList<Token> tokens = metadata.sortedTokens();
if (tokens.isEmpty())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org