You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:05 UTC
[08/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 48e1b2f..29b45b4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -37,7 +37,8 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return set of IP addresses, as Strings
*/
- public List<String> getLiveNodes();
+ @Deprecated public List<String> getLiveNodes();
+ public List<String> getLiveNodesWithPort();
/**
* Retrieve the list of unreachable nodes in the cluster, as determined
@@ -45,28 +46,32 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return set of IP addresses, as Strings
*/
- public List<String> getUnreachableNodes();
+ @Deprecated public List<String> getUnreachableNodes();
+ public List<String> getUnreachableNodesWithPort();
/**
* Retrieve the list of nodes currently bootstrapping into the ring.
*
* @return set of IP addresses, as Strings
*/
- public List<String> getJoiningNodes();
+ @Deprecated public List<String> getJoiningNodes();
+ public List<String> getJoiningNodesWithPort();
/**
* Retrieve the list of nodes currently leaving the ring.
*
* @return set of IP addresses, as Strings
*/
- public List<String> getLeavingNodes();
+ @Deprecated public List<String> getLeavingNodes();
+ public List<String> getLeavingNodesWithPort();
/**
* Retrieve the list of nodes currently moving in the ring.
*
* @return set of IP addresses, as Strings
*/
- public List<String> getMovingNodes();
+ @Deprecated public List<String> getMovingNodes();
+ public List<String> getMovingNodesWithPort();
/**
* Fetch string representations of the tokens for this node.
@@ -120,7 +125,8 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return mapping of ranges to end points
*/
- public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace);
+ @Deprecated public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace);
+ public Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace);
/**
* Retrieve a map of range to rpc addresses that describe the ring topology
@@ -128,7 +134,8 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return mapping of ranges to rpc addresses
*/
- public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace);
+ @Deprecated public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace);
+ public Map<List<String>, List<String>> getRangeToNativeaddressWithPortMap(String keyspace);
/**
* The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
@@ -137,14 +144,16 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return a List of TokenRange(s) converted to String for the given keyspace
*/
- public List <String> describeRingJMX(String keyspace) throws IOException;
+ @Deprecated public List <String> describeRingJMX(String keyspace) throws IOException;
+ public List<String> describeRingWithPortJMX(String keyspace) throws IOException;
/**
* Retrieve a map of pending ranges to endpoints that describe the ring topology
* @param keyspace the keyspace to get the pending range map for.
* @return a map of pending ranges to endpoints
*/
- public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace);
+ @Deprecated public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace);
+ public Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace);
/**
* Retrieve a map of tokens to endpoints, including the bootstrapping
@@ -152,7 +161,8 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return a map of tokens to endpoints in ascending order
*/
- public Map<String, String> getTokenToEndpointMap();
+ @Deprecated public Map<String, String> getTokenToEndpointMap();
+ public Map<String, String> getTokenToEndpointWithPortMap();
/** Retrieve this hosts unique ID */
public String getLocalHostId();
@@ -162,16 +172,19 @@ public interface StorageServiceMBean extends NotificationEmitter
public Map<String, String> getHostIdMap();
/** Retrieve the mapping of endpoint to host ID */
- public Map<String, String> getEndpointToHostId();
+ @Deprecated public Map<String, String> getEndpointToHostId();
+ public Map<String, String> getEndpointWithPortToHostId();
/** Retrieve the mapping of host ID to endpoint */
- public Map<String, String> getHostIdToEndpoint();
+ @Deprecated public Map<String, String> getHostIdToEndpoint();
+ public Map<String, String> getHostIdToEndpointWithPort();
/** Human-readable load value */
public String getLoadString();
/** Human-readable load value. Keys are IP addresses. */
- public Map<String, String> getLoadMap();
+ @Deprecated public Map<String, String> getLoadMap();
+ public Map<String, String> getLoadMapWithPort();
/**
* Return the generation value for this node.
@@ -189,8 +202,10 @@ public interface StorageServiceMBean extends NotificationEmitter
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key);
- public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key);
+ @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key);
+ public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key);
+ @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key);
+ public List<String> getNaturalEndpointsWithPort(String keysapceName, ByteBuffer key);
/**
* @deprecated use {@link #takeSnapshot(String tag, Map options, String... entities)} instead.
@@ -353,7 +368,8 @@ public interface StorageServiceMBean extends NotificationEmitter
/**
* Get the status of a token removal.
*/
- public String getRemovalStatus();
+ @Deprecated public String getRemovalStatus();
+ public String getRemovalStatusWithPort();
/**
* Force a remove operation to finish.
@@ -408,7 +424,8 @@ public interface StorageServiceMBean extends NotificationEmitter
* given a list of tokens (representing the nodes in the cluster), returns
* a mapping from {@code "token -> %age of cluster owned by that token"}
*/
- public Map<InetAddress, Float> getOwnership();
+ @Deprecated public Map<InetAddress, Float> getOwnership();
+ public Map<String, Float> getOwnershipWithPort();
/**
* Effective ownership is % of the data each node owns given the keyspace
@@ -417,7 +434,8 @@ public interface StorageServiceMBean extends NotificationEmitter
* in the cluster have the same replication strategies and if yes then we will
* use the first else a empty Map is returned.
*/
- public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException;
+ @Deprecated public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException;
+ public Map<String, Float> effectiveOwnershipWithPort(String keyspace) throws IllegalStateException;
public List<String> getKeyspaces();
@@ -425,7 +443,8 @@ public interface StorageServiceMBean extends NotificationEmitter
public List<String> getNonLocalStrategyKeyspaces();
- public Map<String, String> getViewBuildStatuses(String keyspace, String view);
+ @Deprecated public Map<String, String> getViewBuildStatuses(String keyspace, String view);
+ public Map<String, String> getViewBuildStatusesWithPort(String keyspace, String view);
/**
* Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/TokenRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TokenRange.java b/src/java/org/apache/cassandra/service/TokenRange.java
index 0e46910..a1f9aee 100644
--- a/src/java/org/apache/cassandra/service/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/TokenRange.java
@@ -17,13 +17,13 @@
*/
package org.apache.cassandra.service;
-import java.net.InetAddress;
import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Holds token range informations for the sake of {@link StorageService#describeRing}.
@@ -54,13 +54,13 @@ public class TokenRange
return tokenFactory.toString(tk);
}
- public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range, List<InetAddress> endpoints)
+ public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range, List<InetAddressAndPort> endpoints, boolean withPorts)
{
List<EndpointDetails> details = new ArrayList<>(endpoints.size());
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- for (InetAddress ep : endpoints)
+ for (InetAddressAndPort ep : endpoints)
details.add(new EndpointDetails(ep,
- StorageService.instance.getRpcaddress(ep),
+ StorageService.instance.getNativeaddress(ep, withPorts),
snitch.getDatacenter(ep),
snitch.getRack(ep)));
return new TokenRange(tokenFactory, range, details);
@@ -69,6 +69,11 @@ public class TokenRange
@Override
public String toString()
{
+ return toString(false);
+ }
+
+ public String toString(boolean withPorts)
+ {
StringBuilder sb = new StringBuilder("TokenRange(");
sb.append("start_token:").append(toStr(range.left));
@@ -76,33 +81,43 @@ public class TokenRange
List<String> hosts = new ArrayList<>(endpoints.size());
List<String> rpcs = new ArrayList<>(endpoints.size());
+ List<String> endpointDetails = new ArrayList<>(endpoints.size());
for (EndpointDetails ep : endpoints)
{
- hosts.add(ep.host.getHostAddress());
- rpcs.add(ep.rpcAddress);
+ hosts.add(ep.host.getHostAddress(withPorts));
+ rpcs.add(ep.nativeAddress);
+ endpointDetails.add(ep.toString(withPorts));
}
- sb.append("endpoints:").append(hosts);
- sb.append("rpc_endpoints:").append(rpcs);
- sb.append("endpoint_details:").append(endpoints);
-
+ if (withPorts)
+ {
+ sb.append(", endpoints:").append(hosts);
+ sb.append(", rpc_endpoints:").append(rpcs);
+ sb.append(", endpoint_details:").append(endpointDetails);
+ }
+ else
+ {
+ sb.append("endpoints:").append(hosts);
+ sb.append("rpc_endpoints:").append(rpcs);
+ sb.append("endpoint_details:").append(endpointDetails);
+ }
sb.append(")");
return sb.toString();
}
public static class EndpointDetails
{
- public final InetAddress host;
- public final String rpcAddress;
+ public final InetAddressAndPort host;
+ public final String nativeAddress;
public final String datacenter;
public final String rack;
- private EndpointDetails(InetAddress host, String rpcAddress, String datacenter, String rack)
+ private EndpointDetails(InetAddressAndPort host, String nativeAddress, String datacenter, String rack)
{
// dc and rack can be null, but host shouldn't
assert host != null;
this.host = host;
- this.rpcAddress = rpcAddress;
+ this.nativeAddress = nativeAddress;
this.datacenter = datacenter;
this.rack = rack;
}
@@ -110,10 +125,15 @@ public class TokenRange
@Override
public String toString()
{
+ return toString(false);
+ }
+
+ public String toString(boolean withPorts)
+ {
// Format matters for backward compatibility with describeRing()
String dcStr = datacenter == null ? "" : String.format(", datacenter:%s", datacenter);
String rackStr = rack == null ? "" : String.format(", rack:%s", rack);
- return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(), dcStr, rackStr);
+ return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(withPorts), dcStr, rackStr);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 55ca5aa..65efeff 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -27,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.WriteType;
@@ -42,8 +42,8 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater
= AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses");
- public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
- Collection<InetAddress> pendingEndpoints,
+ public WriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints,
+ Collection<InetAddressAndPort> pendingEndpoints,
ConsistencyLevel consistencyLevel,
Keyspace keyspace,
Runnable callback,
@@ -54,12 +54,12 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
responses = totalBlockFor();
}
- public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime)
+ public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime)
{
- this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime);
+ this(Arrays.asList(endpoint), Collections.<InetAddressAndPort>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime);
}
- public WriteResponseHandler(InetAddress endpoint, WriteType writeType, long queryStartNanoTime)
+ public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, long queryStartNanoTime)
{
this(endpoint, writeType, null, queryStartNanoTime);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 381c498..ed70e96 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.service.paxos;
*/
-import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
@@ -48,7 +48,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
public Commit mostRecentInProgressCommit;
public Commit mostRecentInProgressCommitWithUpdate;
- private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
+ private final Map<InetAddressAndPort, Commit> commitsByReplica = new ConcurrentHashMap<>();
public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
{
@@ -90,7 +90,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
latch.countDown();
}
- public Iterable<InetAddress> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec)
+ public Iterable<InetAddressAndPort> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec)
{
// In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
// coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
@@ -105,9 +105,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec)
return Collections.emptySet();
- return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
+ return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddressAndPort>()
{
- public boolean apply(InetAddress inetAddress)
+ public boolean apply(InetAddressAndPort inetAddress)
{
return (!commitsByReplica.get(inetAddress).ballot.equals(mostRecentCommit.ballot));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index fdd3e97..2334599 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -18,10 +18,11 @@
package org.apache.cassandra.streaming;
import java.io.Serializable;
-import java.net.InetAddress;
import com.google.common.base.Objects;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
/**
* ProgressInfo contains file transfer progress.
*/
@@ -48,14 +49,14 @@ public class ProgressInfo implements Serializable
}
}
- public final InetAddress peer;
+ public final InetAddressAndPort peer;
public final int sessionIndex;
public final String fileName;
public final Direction direction;
public final long currentBytes;
public final long totalBytes;
- public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
+ public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
{
assert totalBytes > 0;
@@ -102,13 +103,18 @@ public class ProgressInfo implements Serializable
@Override
public String toString()
{
+ return toString(false);
+ }
+
+ public String toString(boolean withPorts)
+ {
StringBuilder sb = new StringBuilder(fileName);
sb.append(" ").append(currentBytes);
sb.append("/").append(totalBytes).append(" bytes");
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
sb.append("idx:").append(sessionIndex);
- sb.append(peer);
+ sb.append(peer.toString(withPorts));
return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index 1521614..bbca753 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.streaming;
import java.io.Serializable;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,6 +26,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -34,9 +34,9 @@ import org.apache.cassandra.utils.FBUtilities;
*/
public final class SessionInfo implements Serializable
{
- public final InetAddress peer;
+ public final InetAddressAndPort peer;
public final int sessionIndex;
- public final InetAddress connecting;
+ public final InetAddressAndPort connecting;
/** Immutable collection of receiving summaries */
public final Collection<StreamSummary> receivingSummaries;
/** Immutable collection of sending summaries*/
@@ -47,9 +47,9 @@ public final class SessionInfo implements Serializable
private final Map<String, ProgressInfo> receivingFiles;
private final Map<String, ProgressInfo> sendingFiles;
- public SessionInfo(InetAddress peer,
+ public SessionInfo(InetAddressAndPort peer,
int sessionIndex,
- InetAddress connecting,
+ InetAddressAndPort connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state)
@@ -195,6 +195,6 @@ public final class SessionInfo implements Serializable
public SessionSummary createSummary()
{
- return new SessionSummary(FBUtilities.getBroadcastAddress(), peer, receivingSummaries, sendingSummaries);
+ return new SessionSummary(FBUtilities.getBroadcastAddressAndPort(), peer, receivingSummaries, sendingSummaries);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/SessionSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java b/src/java/org/apache/cassandra/streaming/SessionSummary.java
index d52c2ca..cf63a57 100644
--- a/src/java/org/apache/cassandra/streaming/SessionSummary.java
+++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.streaming;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -28,19 +27,19 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
public class SessionSummary
{
- public final InetAddress coordinator;
- public final InetAddress peer;
+ public final InetAddressAndPort coordinator;
+ public final InetAddressAndPort peer;
/** Immutable collection of receiving summaries */
public final Collection<StreamSummary> receivingSummaries;
/** Immutable collection of sending summaries*/
public final Collection<StreamSummary> sendingSummaries;
- public SessionSummary(InetAddress coordinator, InetAddress peer,
+ public SessionSummary(InetAddressAndPort coordinator, InetAddressAndPort peer,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries)
{
@@ -81,8 +80,8 @@ public class SessionSummary
{
public void serialize(SessionSummary summary, DataOutputPlus out, int version) throws IOException
{
- ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator), out);
- ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.peer), out);
+ CompactEndpointSerializationHelper.instance.serialize(summary.coordinator, out, version);
+ CompactEndpointSerializationHelper.instance.serialize(summary.peer, out, version);
out.writeInt(summary.receivingSummaries.size());
for (StreamSummary streamSummary: summary.receivingSummaries)
@@ -99,8 +98,8 @@ public class SessionSummary
public SessionSummary deserialize(DataInputPlus in, int version) throws IOException
{
- InetAddress coordinator = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
- InetAddress peer = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
+ InetAddressAndPort coordinator = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ InetAddressAndPort peer = CompactEndpointSerializationHelper.instance.deserialize(in, version);
int numRcvd = in.readInt();
List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd);
@@ -122,8 +121,8 @@ public class SessionSummary
public long serializedSize(SessionSummary summary, int version)
{
long size = 0;
- size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator));
- size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.peer));
+ size += CompactEndpointSerializationHelper.instance.serializedSize(summary.coordinator, version);
+ size += CompactEndpointSerializationHelper.instance.serializedSize(summary.peer, version);
size += TypeSizes.sizeof(summary.receivingSummaries.size());
for (StreamSummary streamSummary: summary.receivingSummaries)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index bb8c702..a22e07d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -17,13 +17,13 @@
*/
package org.apache.cassandra.streaming;
-import java.net.InetAddress;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -45,7 +45,7 @@ public class StreamCoordinator
FBUtilities.getAvailableProcessors());
private final boolean connectSequentially;
- private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
+ private Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>();
private final int connectionsPerHost;
private StreamConnectionFactory factory;
private final boolean keepSSTableLevel;
@@ -143,29 +143,29 @@ public class StreamCoordinator
if (sessionsToConnect.hasNext())
{
StreamSession next = sessionsToConnect.next();
- logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress());
+ logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.toString());
streamExecutor.execute(new StreamSessionConnector(next));
}
else
logger.debug("Finished connecting all sessions");
}
- public synchronized Set<InetAddress> getPeers()
+ public synchronized Set<InetAddressAndPort> getPeers()
{
return new HashSet<>(peerSessions.keySet());
}
- public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
+ public synchronized StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting)
{
return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting);
}
- public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting)
+ public synchronized StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting)
{
return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting);
}
- public StreamSession getSessionById(InetAddress peer, int id)
+ public StreamSession getSessionById(InetAddressAndPort peer, int id)
{
return getHostData(peer).getSessionById(id);
}
@@ -191,7 +191,7 @@ public class StreamCoordinator
return result;
}
- public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ public synchronized void transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
HostStreamingData sessionList = getOrCreateHostData(to);
@@ -239,7 +239,7 @@ public class StreamCoordinator
return result;
}
- private HostStreamingData getHostData(InetAddress peer)
+ private HostStreamingData getHostData(InetAddressAndPort peer)
{
HostStreamingData data = peerSessions.get(peer);
if (data == null)
@@ -247,7 +247,7 @@ public class StreamCoordinator
return data;
}
- private HostStreamingData getOrCreateHostData(InetAddress peer)
+ private HostStreamingData getOrCreateHostData(InetAddressAndPort peer)
{
HostStreamingData data = peerSessions.get(peer);
if (data == null)
@@ -297,7 +297,7 @@ public class StreamCoordinator
return false;
}
- public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
+ public StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting)
{
// create
if (streamSessions.size() < connectionsPerHost)
@@ -329,7 +329,7 @@ public class StreamCoordinator
return Collections.unmodifiableCollection(streamSessions.values());
}
- public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting)
+ public StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting)
{
StreamSession session = streamSessions.get(id);
if (session == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index 6ea2814..7ecd081 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.streaming;
-import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -27,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
public abstract class StreamEvent
{
@@ -48,7 +48,7 @@ public abstract class StreamEvent
public static class SessionCompleteEvent extends StreamEvent
{
- public final InetAddress peer;
+ public final InetAddressAndPort peer;
public final boolean success;
public final int sessionIndex;
public final Set<StreamRequest> requests;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index a44f02e..81c65c5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.streaming;
-import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -35,6 +34,7 @@ import com.google.common.util.concurrent.RateLimiter;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
@@ -55,7 +55,7 @@ public class StreamManager implements StreamManagerMBean
*
* @return StreamRateLimiter with rate limit set based on peer location.
*/
- public static StreamRateLimiter getRateLimiter(InetAddress peer)
+ public static StreamRateLimiter getRateLimiter(InetAddressAndPort peer)
{
return new StreamRateLimiter(peer);
}
@@ -67,7 +67,7 @@ public class StreamManager implements StreamManagerMBean
private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
private final boolean isLocalDC;
- public StreamRateLimiter(InetAddress peer)
+ public StreamRateLimiter(InetAddressAndPort peer)
{
double throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
mayUpdateThroughput(throughput, limiter);
@@ -176,7 +176,7 @@ public class StreamManager implements StreamManagerMBean
return notifier.getNotificationInfo();
}
- public StreamSession findSession(InetAddress peer, UUID planId, int sessionIndex)
+ public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex)
{
StreamSession session = findSession(initiatedStreams, peer, planId, sessionIndex);
if (session != null)
@@ -185,7 +185,7 @@ public class StreamManager implements StreamManagerMBean
return findSession(receivingStreams, peer, planId, sessionIndex);
}
- private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddress peer, UUID planId, int sessionIndex)
+ private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddressAndPort peer, UUID planId, int sessionIndex)
{
StreamResultFuture streamResultFuture = streams.get(planId);
if (streamResultFuture == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 213f74b..43e9068 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -17,11 +17,11 @@
*/
package org.apache.cassandra.streaming;
-import java.net.InetAddress;
import java.util.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
@@ -73,7 +73,7 @@ public class StreamPlan
* @param ranges ranges to fetch
* @return this object for chaining
*/
- public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges)
+ public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges)
{
return requestRanges(from, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
}
@@ -88,7 +88,7 @@ public class StreamPlan
* @param columnFamilies specific column families
* @return this object for chaining
*/
- public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+ public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
StreamSession session = coordinator.getOrCreateNextSession(from, connecting);
session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
@@ -98,9 +98,9 @@ public class StreamPlan
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
*
- * @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...)
+ * @see #transferRanges(InetAddressAndPort, InetAddressAndPort, String, java.util.Collection, String...)
*/
- public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+ public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
return transferRanges(to, to, keyspace, ranges, columnFamilies);
}
@@ -114,7 +114,7 @@ public class StreamPlan
* @param ranges ranges to send
* @return this object for chaining
*/
- public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges)
+ public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges)
{
return transferRanges(to, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
}
@@ -129,7 +129,7 @@ public class StreamPlan
* @param columnFamilies specific column families
* @return this object for chaining
*/
- public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+ public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
StreamSession session = coordinator.getOrCreateNextSession(to, connecting);
session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
@@ -144,7 +144,7 @@ public class StreamPlan
* this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
- public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ public StreamPlan transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
coordinator.transferFiles(to, sstableDetails);
return this;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 0f74c7f..544f37f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -103,7 +104,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
public static synchronized StreamResultFuture initReceivingSide(int sessionIndex,
UUID planId,
StreamOperation streamOperation,
- InetAddress from,
+ InetAddressAndPort from,
Channel channel,
boolean keepSSTableLevel,
UUID pendingRepair,
@@ -135,11 +136,15 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
return coordinator;
}
- private void attachConnection(InetAddress from, int sessionIndex, Channel channel)
+ private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel)
{
SocketAddress addr = channel.remoteAddress();
- InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from);
- StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
+ //In the case of unit tests, if you use the EmbeddedChannel, channel.remoteAddress()
+ //does not return an InetSocketAddress, but an EmbeddedSocketAddress. Hence why we need the type check here
+ InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address);
+ //Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from"
+ //Will work since we don't actually have ports diverge across network interfaces
+ StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port));
session.init(this);
session.attach(channel);
}
@@ -228,7 +233,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
}
}
- StreamSession getSession(InetAddress peer, int sessionIndex)
+ StreamSession getSession(InetAddressAndPort peer, int sessionIndex)
{
return coordinator.getSessionById(peer, sessionIndex);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index b6351f9..4085c43 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
@@ -142,14 +143,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber
/**
* Streaming endpoint.
*
- * Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming.
+ * Each {@code StreamSession} is identified by this InetAddressAndPort which is broadcast address of the node streaming.
*/
- public final InetAddress peer;
+ public final InetAddressAndPort peer;
private final int index;
/** Actual connecting address. Can be the same as {@linkplain #peer}. */
- public final InetAddress connecting;
+ public final InetAddressAndPort connecting;
// should not be null when session is started
private StreamResultFuture streamResult;
@@ -191,14 +192,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber
* @param peer Address of streaming peer
* @param connecting Actual connecting address
*/
- public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+ public StreamSession(InetAddressAndPort peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
{
this.peer = peer;
this.connecting = connecting;
this.index = index;
- OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0),
- new InetSocketAddress(connecting, MessagingService.instance().portFor(connecting)));
+ OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(InetAddressAndPort.getByAddressOverrideDefaults(FBUtilities.getBroadcastAddressAndPort().address, 0),
+ InetAddressAndPort.getByAddressOverrideDefaults(connecting.address, MessagingService.instance().portFor(connecting)));
this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
this.metrics = StreamingMetrics.get(connecting);
this.keepSSTableLevel = keepSSTableLevel;
@@ -607,16 +608,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
"If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(),
- peer.getHostAddress(),
- peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+ peer.getHostAddress(true),
+ peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true),
2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
e);
}
else
{
logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", planId(),
- peer.getHostAddress(),
- peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+ peer.getHostAddress(true),
+ peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true),
e);
}
}
@@ -644,7 +645,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
prepareReceiving(summary);
PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage();
- if (!peer.equals(FBUtilities.getBroadcastAddress()))
+ if (!peer.equals(FBUtilities.getBroadcastAddressAndPort()))
for (StreamTransferTask task : transfers.values())
prepareSynAck.summaries.add(task.getSummary());
messageSender.sendMessage(prepareSynAck);
@@ -754,7 +755,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
*/
public synchronized void sessionFailed()
{
- logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), peer.getHostAddress());
+ logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), peer.toString());
closeSession(State.FAILED);
}
@@ -784,21 +785,21 @@ public class StreamSession implements IEndpointStateChangeSubscriber
maybeCompleted();
}
- public void onJoin(InetAddress endpoint, EndpointState epState) {}
- public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddress endpoint, EndpointState state) {}
- public void onDead(InetAddress endpoint, EndpointState state) {}
+ public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
+ public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- public void onRemove(InetAddress endpoint)
+ public void onRemove(InetAddressAndPort endpoint)
{
- logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.getHostAddress());
+ logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.toString());
closeSession(State.FAILED);
}
- public void onRestart(InetAddress endpoint, EndpointState epState)
+ public void onRestart(InetAddressAndPort endpoint, EndpointState epState)
{
- logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), peer.getHostAddress());
+ logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), peer.toString());
closeSession(State.FAILED);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index 0b38760..20b7c87 100644
--- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -137,7 +137,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
@Override
public void initialize() throws IOException
{
- StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(),
+ StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddressAndPort(),
session.sessionIndex(),
session.planId(),
session.streamOperation(),
@@ -183,7 +183,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
{
Channel channel = factory.createConnection(connectionId, protocolVersion);
ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remoteAddress(), protocolVersion, session));
+ pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remote(), protocolVersion, session));
channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
return channel;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index cc6f9e0..907572b 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.streaming.async;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -38,6 +37,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocalThread;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveException;
@@ -65,7 +65,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15;
private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16;
- private final InetSocketAddress remoteAddress;
+ private final InetAddressAndPort remoteAddress;
private final int protocolVersion;
private final StreamSession session;
@@ -82,7 +82,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
private volatile boolean closed;
- public StreamingInboundHandler(InetSocketAddress remoteAddress, int protocolVersion, @Nullable StreamSession session)
+ public StreamingInboundHandler(InetAddressAndPort remoteAddress, int protocolVersion, @Nullable StreamSession session)
{
this.remoteAddress = remoteAddress;
this.protocolVersion = protocolVersion;
@@ -254,11 +254,11 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
*/
static class SessionIdentifier
{
- final InetAddress from;
+ final InetAddressAndPort from;
final UUID planId;
final int sessionIndex;
- SessionIdentifier(InetAddress from, UUID planId, int sessionIndex)
+ SessionIdentifier(InetAddressAndPort from, UUID planId, int sessionIndex)
{
this.from = from;
this.planId = planId;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index b9e6951..964fe10 100644
--- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.streaming.management;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
@@ -26,12 +25,14 @@ import javax.management.openmbean.*;
import com.google.common.base.Throwables;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.ProgressInfo;
public class ProgressInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
+ "peer storage port",
"sessionIndex",
"fileName",
"direction",
@@ -39,6 +40,7 @@ public class ProgressInfoCompositeData
"totalBytes"};
private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
"Session peer",
+ "Session peer storage port",
"Index of session",
"Name of the file",
"Direction('IN' or 'OUT')",
@@ -47,6 +49,7 @@ public class ProgressInfoCompositeData
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
SimpleType.INTEGER,
+ SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.STRING,
SimpleType.LONG,
@@ -73,12 +76,13 @@ public class ProgressInfoCompositeData
{
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
- valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
- valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
- valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
- valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
- valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
- valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
+ valueMap.put(ITEM_NAMES[1], progressInfo.peer.address.getHostAddress());
+ valueMap.put(ITEM_NAMES[2], progressInfo.peer.port);
+ valueMap.put(ITEM_NAMES[3], progressInfo.sessionIndex);
+ valueMap.put(ITEM_NAMES[4], progressInfo.fileName);
+ valueMap.put(ITEM_NAMES[5], progressInfo.direction.name());
+ valueMap.put(ITEM_NAMES[6], progressInfo.currentBytes);
+ valueMap.put(ITEM_NAMES[7], progressInfo.totalBytes);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -94,12 +98,12 @@ public class ProgressInfoCompositeData
Object[] values = cd.getAll(ITEM_NAMES);
try
{
- return new ProgressInfo(InetAddress.getByName((String) values[1]),
- (int) values[2],
- (String) values[3],
- ProgressInfo.Direction.valueOf((String)values[4]),
- (long) values[5],
- (long) values[6]);
+ return new ProgressInfo(InetAddressAndPort.getByNameOverrideDefaults((String) values[1], (Integer)values[2]),
+ (int) values[3],
+ (String) values[4],
+ ProgressInfo.Direction.valueOf((String)values[5]),
+ (long) values[6],
+ (long) values[7]);
}
catch (UnknownHostException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
index 516582a..1c0d8c5 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
@@ -29,12 +29,15 @@ public class SessionCompleteEventCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
+ "peer storage port",
"success"};
private static final String[] ITEM_DESCS = new String[]{"Plan ID",
"Session peer",
+ "Session peer storage port",
"Indicates whether session was successful"};
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
+ SimpleType.INTEGER,
SimpleType.BOOLEAN};
public static final CompositeType COMPOSITE_TYPE;
@@ -58,8 +61,9 @@ public class SessionCompleteEventCompositeData
{
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], event.planId.toString());
- valueMap.put(ITEM_NAMES[1], event.peer.getHostAddress());
- valueMap.put(ITEM_NAMES[2], event.success);
+ valueMap.put(ITEM_NAMES[1], event.peer.address.getHostAddress());
+ valueMap.put(ITEM_NAMES[2], event.peer.port);
+ valueMap.put(ITEM_NAMES[3], event.success);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index a6762a8..d20eaf5 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.streaming.management;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import javax.management.openmbean.*;
@@ -27,6 +26,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamSession;
@@ -36,7 +36,9 @@ public class SessionInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
+ "peer_port",
"connecting",
+ "connecting_port",
"receivingSummaries",
"sendingSummaries",
"state",
@@ -45,7 +47,9 @@ public class SessionInfoCompositeData
"sessionIndex"};
private static final String[] ITEM_DESCS = new String[]{"Plan ID",
"Session peer",
+ "Session peer storage port",
"Connecting address",
+ "Connecting storage port",
"Summaries of receiving data",
"Summaries of sending data",
"Current session state",
@@ -61,7 +65,9 @@ public class SessionInfoCompositeData
{
ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
+ SimpleType.INTEGER,
SimpleType.STRING,
+ SimpleType.INTEGER,
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
SimpleType.STRING,
@@ -84,8 +90,10 @@ public class SessionInfoCompositeData
{
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
- valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
- valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress());
+ valueMap.put(ITEM_NAMES[1], sessionInfo.peer.address.getHostAddress());
+ valueMap.put(ITEM_NAMES[2], sessionInfo.peer.port);
+ valueMap.put(ITEM_NAMES[3], sessionInfo.connecting.address.getHostAddress());
+ valueMap.put(ITEM_NAMES[4], sessionInfo.connecting.port);
Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
{
public CompositeData apply(StreamSummary input)
@@ -93,9 +101,9 @@ public class SessionInfoCompositeData
return StreamSummaryCompositeData.toCompositeData(input);
}
};
- valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
- valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
- valueMap.put(ITEM_NAMES[5], sessionInfo.state.name());
+ valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
+ valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
+ valueMap.put(ITEM_NAMES[7], sessionInfo.state.name());
Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
{
public CompositeData apply(ProgressInfo input)
@@ -103,9 +111,9 @@ public class SessionInfoCompositeData
return ProgressInfoCompositeData.toCompositeData(planId, input);
}
};
- valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
- valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
- valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex);
+ valueMap.put(ITEM_NAMES[8], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+ valueMap.put(ITEM_NAMES[9], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
+ valueMap.put(ITEM_NAMES[10], sessionInfo.sessionIndex);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -121,11 +129,11 @@ public class SessionInfoCompositeData
assert cd.getCompositeType().equals(COMPOSITE_TYPE);
Object[] values = cd.getAll(ITEM_NAMES);
- InetAddress peer, connecting;
+ InetAddressAndPort peer, connecting;
try
{
- peer = InetAddress.getByName((String) values[1]);
- connecting = InetAddress.getByName((String) values[2]);
+ peer = InetAddressAndPort.getByNameOverrideDefaults((String) values[1], (Integer)values[2]);
+ connecting = InetAddressAndPort.getByNameOverrideDefaults((String) values[3], (Integer)values[4]);
}
catch (UnknownHostException e)
{
@@ -139,11 +147,11 @@ public class SessionInfoCompositeData
}
};
SessionInfo info = new SessionInfo(peer,
- (int)values[8],
+ (int)values[10],
connecting,
- fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
- fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
- StreamSession.State.valueOf((String) values[5]));
+ fromArrayOfCompositeData((CompositeData[]) values[5], toStreamSummary),
+ fromArrayOfCompositeData((CompositeData[]) values[6], toStreamSummary),
+ StreamSession.State.valueOf((String) values[7]));
Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
{
public ProgressInfo apply(CompositeData input)
@@ -151,11 +159,11 @@ public class SessionInfoCompositeData
return ProgressInfoCompositeData.fromCompositeData(input);
}
};
- for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
+ for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[8], toProgressInfo))
{
info.updateProgress(progress);
}
- for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo))
+ for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[9], toProgressInfo))
{
info.updateProgress(progress);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index fedb971..13a3358 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.streaming.messages;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -30,6 +29,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
@@ -67,13 +67,13 @@ public class FileMessageHeader
public final UUID pendingRepair;
public final int sstableLevel;
public final SerializationHeader.Component header;
- public final InetAddress sender;
+ public final InetAddressAndPort sender;
/* cached size value */
private transient final long size;
private FileMessageHeader(TableId tableId,
- InetAddress sender,
+ InetAddressAndPort sender,
UUID planId,
int sessionIndex,
int sequenceNumber,
@@ -106,7 +106,7 @@ public class FileMessageHeader
}
public FileMessageHeader(TableId tableId,
- InetAddress sender,
+ InetAddressAndPort sender,
UUID planId,
int sessionIndex,
int sequenceNumber,
@@ -218,7 +218,7 @@ public class FileMessageHeader
public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
{
header.tableId.serialize(out);
- CompactEndpointSerializationHelper.serialize(header.sender, out);
+ CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version);
UUIDSerializer.serializer.serialize(header.planId, out, version);
out.writeInt(header.sessionIndex);
out.writeInt(header.sequenceNumber);
@@ -252,7 +252,7 @@ public class FileMessageHeader
public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
{
TableId tableId = TableId.deserialize(in);
- InetAddress sender = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
int sessionIndex = in.readInt();
int sequenceNumber = in.readInt();
@@ -276,7 +276,7 @@ public class FileMessageHeader
public long serializedSize(FileMessageHeader header, int version)
{
long size = header.tableId.serializedSize();
- size += CompactEndpointSerializationHelper.serializedSize(header.sender);
+ size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version);
size += UUIDSerializer.serializer.serializedSize(header.planId, version);
size += TypeSizes.sizeof(header.sessionIndex);
size += TypeSizes.sizeof(header.sequenceNumber);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index f44b41c..8bbcc05 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -79,7 +79,7 @@ public class OutgoingFileMessage extends StreamMessage
SSTableReader sstable = ref.get();
filename = sstable.getFilename();
this.header = new FileMessageHeader(sstable.metadata().id,
- FBUtilities.getBroadcastAddress(),
+ FBUtilities.getBroadcastAddressAndPort(),
session.planId(),
session.sessionIndex(),
sequenceNumber,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 68c6034..fced133 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -18,12 +18,12 @@
package org.apache.cassandra.streaming.messages;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamOperation;
@@ -39,7 +39,7 @@ public class StreamInitMessage extends StreamMessage
{
public static Serializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
- public final InetAddress from;
+ public final InetAddressAndPort from;
public final int sessionIndex;
public final UUID planId;
public final StreamOperation streamOperation;
@@ -48,7 +48,7 @@ public class StreamInitMessage extends StreamMessage
public final UUID pendingRepair;
public final PreviewKind previewKind;
- public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+ public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
{
super(Type.STREAM_INIT);
this.from = from;
@@ -73,7 +73,7 @@ public class StreamInitMessage extends StreamMessage
{
public void serialize(StreamInitMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
- CompactEndpointSerializationHelper.serialize(message.from, out);
+ CompactEndpointSerializationHelper.streamingInstance.serialize(message.from, out, version);
out.writeInt(message.sessionIndex);
UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
out.writeUTF(message.streamOperation.getDescription());
@@ -89,7 +89,7 @@ public class StreamInitMessage extends StreamMessage
public StreamInitMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException
{
- InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddressAndPort from = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
int sessionIndex = in.readInt();
UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
String description = in.readUTF();
@@ -102,7 +102,7 @@ public class StreamInitMessage extends StreamMessage
public long serializedSize(StreamInitMessage message, int version)
{
- long size = CompactEndpointSerializationHelper.serializedSize(message.from);
+ long size = CompactEndpointSerializationHelper.streamingInstance.serializedSize(message.from, version);
size += TypeSizes.sizeof(message.sessionIndex);
size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
size += TypeSizes.sizeof(message.streamOperation.getDescription());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index b56d292..cce686f 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -22,21 +22,21 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import io.netty.channel.Channel;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.StreamConnectionFactory;
public class BulkLoadConnectionFactory extends DefaultConnectionFactory implements StreamConnectionFactory
{
private final boolean outboundBindAny;
- private final int storagePort;
private final int secureStoragePort;
private final EncryptionOptions.ServerEncryptionOptions encryptionOptions;
- public BulkLoadConnectionFactory(int storagePort, int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny)
+ public BulkLoadConnectionFactory(int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny)
{
- this.storagePort = storagePort;
this.secureStoragePort = secureStoragePort;
this.encryptionOptions = encryptionOptions != null && encryptionOptions.internode_encryption == EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none
? null
@@ -50,9 +50,9 @@ public class BulkLoadConnectionFactory extends DefaultConnectionFactory implemen
// When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader
// does not know which node is in which dc/rack, connecting to SSL port is always the option.
int port = encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none ?
- secureStoragePort : storagePort;
+ secureStoragePort : connectionId.remote().port;
- connectionId = connectionId.withNewConnectionAddress(new InetSocketAddress(connectionId.remote(), port));
+ connectionId = connectionId.withNewConnectionAddress(InetAddressAndPort.getByAddressOverrideDefaults(connectionId.remote().address, port));
return createConnection(connectionId, protocolVersion, encryptionOptions);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 0812e53..545d1f7 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -18,7 +18,7 @@
package org.apache.cassandra.tools;
import java.io.IOException;
-import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -33,6 +33,7 @@ import org.apache.commons.cli.Options;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -57,11 +58,12 @@ public class BulkLoader
new ExternalClient(
options.hosts,
options.nativePort,
- options.authProvider,
options.storagePort,
+ options.authProvider,
options.sslStoragePort,
options.serverEncOptions,
- buildSSLOptions(options.clientEncOptions)),
+ buildSSLOptions(options.clientEncOptions),
+ options.allowServerPortDiscovery),
handler,
options.connectionsPerHost);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
@@ -124,7 +126,7 @@ public class BulkLoader
private long peak = 0;
private int totalFiles = 0;
- private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create();
+ private final Multimap<InetAddressAndPort, SessionInfo> sessionsByHost = HashMultimap.create();
public ProgressIndicator()
{
@@ -165,7 +167,7 @@ public class BulkLoader
boolean updateTotalFiles = totalFiles == 0;
// recalculate progress across all sessions in all hosts and display
- for (InetAddress peer : sessionsByHost.keySet())
+ for (InetAddressAndPort peer : sessionsByHost.keySet())
{
sb.append("[").append(peer).append("]");
@@ -268,20 +270,19 @@ public class BulkLoader
static class ExternalClient extends NativeSSTableLoaderClient
{
- private final int storagePort;
private final int sslStoragePort;
private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
- public ExternalClient(Set<InetAddress> hosts,
- int port,
- AuthProvider authProvider,
+ public ExternalClient(Set<InetSocketAddress> hosts,
+ int nativePort,
int storagePort,
+ AuthProvider authProvider,
int sslStoragePort,
EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions,
- SSLOptions sslOptions)
+ SSLOptions sslOptions,
+ boolean allowServerPortDiscovery)
{
- super(hosts, port, authProvider, sslOptions);
- this.storagePort = storagePort;
+ super(hosts, nativePort, storagePort, authProvider, sslOptions, allowServerPortDiscovery);
this.sslStoragePort = sslStoragePort;
serverEncOptions = serverEncryptionOptions;
}
@@ -289,7 +290,7 @@ public class BulkLoader
@Override
public StreamConnectionFactory getConnectionFactory()
{
- return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
+ return new BulkLoadConnectionFactory(sslStoragePort, serverEncOptions, false);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org