You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/03/24 19:27:04 UTC
cassandra git commit: Outbound TCP connections should consult
internode authenticator. Patch by Ariel Weisberg;
Reviewed by Marcus Eriksson for CASSANDRA-13324
Repository: cassandra
Updated Branches:
refs/heads/trunk 60e2e9826 -> 732d1af86
Outbound TCP connections should consult internode authenticator.
Patch by Ariel Weisberg; Reviewed by Marcus Eriksson for CASSANDRA-13324
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/732d1af8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/732d1af8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/732d1af8
Branch: refs/heads/trunk
Commit: 732d1af866b91e5ba63e7e2a467d99d4cb90e11f
Parents: 60e2e98
Author: Ariel Weisberg <aw...@apple.com>
Authored: Fri Mar 24 15:26:50 2017 -0400
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Fri Mar 24 15:26:50 2017 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/auth/AuthConfig.java | 10 +---
.../cassandra/config/DatabaseDescriptor.java | 5 +-
.../locator/ReconnectableSnitchHelper.java | 21 +++++--
.../apache/cassandra/net/MessagingService.java | 44 ++++++++++++--
.../cassandra/net/OutboundTcpConnection.java | 33 +++++++---
.../net/OutboundTcpConnectionPool.java | 9 ++-
.../config/DatabaseDescriptorRefTest.java | 1 +
.../locator/ReconnectableSnitchHelperTest.java | 63 ++++++++++++++++++++
.../cassandra/net/MessagingServiceTest.java | 60 +++++++++++++++++++
10 files changed, 218 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb9b8c4..b42bde6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
* Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
* Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
* Incremental repair not streaming correct sstables (CASSANDRA-13328)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/auth/AuthConfig.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthConfig.java b/src/java/org/apache/cassandra/auth/AuthConfig.java
index c389ae4..2ca1522 100644
--- a/src/java/org/apache/cassandra/auth/AuthConfig.java
+++ b/src/java/org/apache/cassandra/auth/AuthConfig.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.FBUtilities;
+import org.hsqldb.Database;
/**
* Only purpose is to Initialize authentication/authorization via {@link #applyAuth()}.
@@ -94,13 +95,8 @@ public final class AuthConfig
// authenticator
- IInternodeAuthenticator internodeAuthenticator;
if (conf.internode_authenticator != null)
- internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
- else
- internodeAuthenticator = new AllowAllInternodeAuthenticator();
-
- DatabaseDescriptor.setInternodeAuthenticator(internodeAuthenticator);
+ DatabaseDescriptor.setInternodeAuthenticator(FBUtilities.construct(conf.internode_authenticator, "internode_authenticator"));
// Validate at last to have authenticator, authorizer, role-manager and internode-auth setup
// in case these rely on each other.
@@ -108,6 +104,6 @@ public final class AuthConfig
authenticator.validateConfiguration();
authorizer.validateConfiguration();
roleManager.validateConfiguration();
- internodeAuthenticator.validateConfiguration();
+ DatabaseDescriptor.getInternodeAuthenticator().validateConfiguration();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4fb742c..465cd8a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -29,6 +29,7 @@ import java.nio.file.Paths;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
@@ -36,6 +37,7 @@ import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
import org.apache.cassandra.auth.AuthConfig;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.auth.IAuthorizer;
@@ -79,7 +81,7 @@ public class DatabaseDescriptor
private static InetAddress rpcAddress;
private static InetAddress broadcastRpcAddress;
private static SeedProvider seedProvider;
- private static IInternodeAuthenticator internodeAuthenticator;
+ private static IInternodeAuthenticator internodeAuthenticator = new AllowAllInternodeAuthenticator();
/* Hashing strategy Random or OPHF */
private static IPartitioner partitioner;
@@ -1538,6 +1540,7 @@ public class DatabaseDescriptor
public static void setInternodeAuthenticator(IInternodeAuthenticator internodeAuthenticator)
{
+ Preconditions.checkNotNull(internodeAuthenticator);
DatabaseDescriptor.internodeAuthenticator = internodeAuthenticator;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index a6bec0c..08f0a14 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -21,8 +21,12 @@ package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.gms.*;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.OutboundTcpConnectionPool;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +53,7 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
{
try
{
- reconnect(publicAddress, InetAddress.getByName(localAddressValue.value));
+ reconnect(publicAddress, InetAddress.getByName(localAddressValue.value), snitch, localDc);
}
catch (UnknownHostException e)
{
@@ -57,12 +61,21 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
}
}
- private void reconnect(InetAddress publicAddress, InetAddress localAddress)
+ @VisibleForTesting
+ static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
{
+ OutboundTcpConnectionPool cp = MessagingService.instance().getConnectionPool(publicAddress);
+ //InternodeAuthenticator said don't connect
+ if (cp == null)
+ {
+ logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
+ return;
+ }
+
if (snitch.getDatacenter(publicAddress).equals(localDc)
- && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress))
+ && !cp.endPoint().equals(localAddress))
{
- MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress);
+ cp.reset(localAddress);
logger.debug("Initiated reconnect to an Internal IP {} for the {}", localAddress, publicAddress);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 729c042..55604d0 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -416,7 +416,8 @@ public final class MessagingService implements MessagingServiceMBean
/* Lookup table for registering message handlers based on the verb. */
private final Map<Verb, IVerbHandler> verbHandlers;
- private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();
+ @VisibleForTesting
+ final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
@@ -531,6 +532,10 @@ public final class MessagingService implements MessagingServiceMBean
maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
ConnectionMetrics.totalTimeouts.mark();
+ OutboundTcpConnectionPool cp = getConnectionPool(expiredCallbackInfo.target);
+ if (cp != null)
+ cp.incrementTimeout();
+
getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
if (expiredCallbackInfo.callback.supportsBackPressure())
@@ -670,8 +675,16 @@ public final class MessagingService implements MessagingServiceMBean
*/
public void convict(InetAddress ep)
{
- logger.trace("Resetting pool for {}", ep);
- getConnectionPool(ep).reset();
+ OutboundTcpConnectionPool cp = getConnectionPool(ep);
+ if (cp != null)
+ {
+ logger.trace("Resetting pool for {}", ep);
+ getConnectionPool(ep).reset();
+ }
+ else
+ {
+ logger.debug("Not resetting pool for {} because internode authenticator said not to connect", ep);
+ }
}
public void listen()
@@ -795,11 +808,22 @@ public final class MessagingService implements MessagingServiceMBean
connectionManagers.remove(to);
}
+ /**
+ * Get a connection pool to the specified endpoint. Constructs one if none exists.
+ *
+ * Can return null if the InternodeAuthenticator fails to authenticate the node.
+ * @param to
+ * @return The connection pool or null if internode authenticator says not to
+ */
public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
{
OutboundTcpConnectionPool cp = connectionManagers.get(to);
if (cp == null)
{
+ //Don't attempt to connect to nodes that won't (or shouldn't) authenticate anyways
+ if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, OutboundTcpConnectionPool.portFor(to)))
+ return null;
+
cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
if (existingPool != null)
@@ -811,10 +835,17 @@ public final class MessagingService implements MessagingServiceMBean
return cp;
}
-
+ /**
+ * Get a connection for a message to a specific endpoint. Constructs one if none exists.
+ *
+ * Can return null if the InternodeAuthenticator fails to authenticate the node.
+ * @param to
+ * @return The connection or null if internode authenticator says not to
+ */
public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg)
{
- return getConnectionPool(to).getConnection(msg);
+ OutboundTcpConnectionPool cp = getConnectionPool(to);
+ return cp == null ? null : cp.getConnection(msg);
}
/**
@@ -968,7 +999,8 @@ public final class MessagingService implements MessagingServiceMBean
OutboundTcpConnection connection = getConnection(to, message);
// write it
- connection.enqueue(message, id);
+ if (connection != null)
+ connection.enqueue(message, id);
}
public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index feff527..9b19eab 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -249,6 +249,12 @@ public class OutboundTcpConnection extends FastThreadLocalThread
break inner;
}
}
+ catch (InternodeAuthFailed e)
+ {
+ logger.warn("Internode auth failed connecting to " + poolReference.endPoint());
+ //Remove the connection pool and other thread so messages aren't queued
+ MessagingService.instance().destroyConnectionPool(poolReference.endPoint());
+ }
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
@@ -394,20 +400,27 @@ public class OutboundTcpConnection extends FastThreadLocalThread
}
@SuppressWarnings("resource")
- private boolean connect()
+ private boolean connect() throws InternodeAuthFailed
{
- logger.debug("Attempting to connect to {}", poolReference.endPoint());
+ InetAddress endpoint = poolReference.endPoint();
+ if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(endpoint, poolReference.portFor(endpoint)))
+ {
+ throw new InternodeAuthFailed();
+ }
+
+ logger.debug("Attempting to connect to {}", endpoint);
+
long start = System.nanoTime();
long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
while (System.nanoTime() - start < timeout)
{
- targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
+ targetVersion = MessagingService.instance().getVersion(endpoint);
try
{
socket = poolReference.newSocket();
socket.setKeepAlive(true);
- if (isLocalDC(poolReference.endPoint()))
+ if (isLocalDC(endpoint))
{
socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
}
@@ -446,7 +459,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
}
else
{
- MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
+ MessagingService.instance().setVersion(endpoint, maxTargetVersion);
}
if (targetVersion > maxTargetVersion)
@@ -454,7 +467,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
logger.trace("Target max version is {}; will reconnect with that version", maxTargetVersion);
try
{
- if (DatabaseDescriptor.getSeeds().contains(poolReference.endPoint()))
+ if (DatabaseDescriptor.getSeeds().contains(endpoint))
logger.warn("Seed gossip version is {}; will not connect with that version", maxTargetVersion);
}
catch (Throwable e)
@@ -484,7 +497,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
if (shouldCompressConnection())
{
out.flush();
- logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint());
+ logger.trace("Upgrading OutputStream to {} to be compressed", endpoint);
// TODO: custom LZ4 OS that supports BB write methods
LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
@@ -495,7 +508,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
checksum,
true)); // no async flushing
}
- logger.debug("Done connecting to {}", poolReference.endPoint());
+ logger.debug("Done connecting to {}", endpoint);
return true;
}
catch (SSLHandshakeException e)
@@ -508,7 +521,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
catch (IOException e)
{
socket = null;
- logger.debug("Unable to connect to {}", poolReference.endPoint(), e);
+ logger.debug("Unable to connect to {}", endpoint, e);
Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, TimeUnit.MILLISECONDS);
}
}
@@ -613,4 +626,6 @@ public class OutboundTcpConnection extends FastThreadLocalThread
return false;
}
}
+
+ private static class InternodeAuthFailed extends Exception {}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 9f9ffee..20a8da6 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -148,6 +148,11 @@ public class OutboundTcpConnectionPool
}
}
+ public static int portFor(InetAddress endpoint)
+ {
+ return isEncryptedChannel(endpoint) ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
+ }
+
public InetAddress endPoint()
{
if (id.equals(FBUtilities.getBroadcastAddress()))
@@ -218,7 +223,7 @@ public class OutboundTcpConnectionPool
smallMessages.closeSocket(true);
if (gossipMessages != null)
gossipMessages.closeSocket(true);
-
- metrics.release();
+ if (metrics != null)
+ metrics.release();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 17cdd77..c8f8bc1 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -54,6 +54,7 @@ import static org.junit.Assert.fail;
public class DatabaseDescriptorRefTest
{
static final String[] validClasses = {
+ "org.apache.cassandra.auth.AllowAllInternodeAuthenticator",
"org.apache.cassandra.auth.IInternodeAuthenticator",
"org.apache.cassandra.auth.IAuthenticator",
"org.apache.cassandra.auth.IAuthorizer",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
new file mode 100644
index 0000000..232865a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingServiceTest;
+
+public class ReconnectableSnitchHelperTest
+{
+ static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+
+ @BeforeClass
+ public static void beforeClass() throws UnknownHostException
+ {
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setBackPressureStrategy(new MessagingServiceTest.MockBackPressureStrategy(Collections.emptyMap()));
+ DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
+ }
+
+ /**
+ * Make sure that if a node fails internode authentication and MessagingService returns a null
+ * pool that ReconnectableSnitchHelper fails gracefully.
+ */
+ @Test
+ public void failedAuthentication() throws Exception
+ {
+ DatabaseDescriptor.setInternodeAuthenticator(MessagingServiceTest.ALLOW_NOTHING_AUTHENTICATOR);
+ InetAddress address = InetAddress.getByName("127.0.0.250");
+ //Should tolerate null returns by MS for the connection
+ ReconnectableSnitchHelper.reconnect(address, address, null, null);
+ }
+
+ @After
+ public void replaceAuthenticator()
+ {
+ DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index d9a9915..e6b5cd0 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -36,12 +36,16 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.codahale.metrics.Timer;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.monitoring.ApproximateTime;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.caffinitas.ohc.histo.EstimatedHistogram;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -52,6 +56,20 @@ public class MessagingServiceTest
{
private final static long ONE_SECOND = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets();
+ public static final IInternodeAuthenticator ALLOW_NOTHING_AUTHENTICATOR = new IInternodeAuthenticator()
+ {
+ public boolean authenticate(InetAddress remoteAddress, int remotePort)
+ {
+ return false;
+ }
+
+ public void validateConfiguration() throws ConfigurationException
+ {
+
+ }
+ };
+ static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+
private final MessagingService messagingService = MessagingService.test();
@BeforeClass
@@ -368,4 +386,46 @@ public class MessagingServiceTest
throw new UnsupportedOperationException("Not supported.");
}
}
+
+ /**
+ * Make sure that if internode authenticatino fails for an outbound connection that all the code that relies
+ * on getting the connection pool handles the null return
+ * @throws Exception
+ */
+ @Test
+ public void testFailedInternodeAuth() throws Exception
+ {
+ MessagingService ms = MessagingService.instance();
+ DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
+ InetAddress address = InetAddress.getByName("127.0.0.250");
+
+ //Should return null
+ assertNull(ms.getConnectionPool(address));
+ assertNull(ms.getConnection(address, new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK)));
+
+ //Should tolerate null
+ ms.convict(address);
+ ms.sendOneWay(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), address);
+ }
+
+ @Test
+ public void testOutboundTcpConnectionCleansUp() throws Exception
+ {
+ MessagingService ms = MessagingService.instance();
+ DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
+ InetAddress address = InetAddress.getByName("127.0.0.250");
+ OutboundTcpConnectionPool pool = new OutboundTcpConnectionPool(address, new MockBackPressureStrategy(null).newState(address));
+ ms.connectionManagers.put(address, pool);
+ pool.smallMessages.start();
+ pool.smallMessages.enqueue(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0);
+ pool.smallMessages.join();
+ assertFalse(ms.connectionManagers.containsKey(address));
+ }
+
+ @After
+ public void replaceAuthenticator()
+ {
+ DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+ }
+
}