You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2022/07/01 15:46:44 UTC
[geode] branch develop updated: GEODE-9484: Improve sending message to multy destinations (#7664)
This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 1d25728e09 GEODE-9484: Improve sending message to multy destinations (#7664)
1d25728e09 is described below
commit 1d25728e09f788e2ca043cbaf393f01c4df576f9
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Fri Jul 1 17:46:37 2022 +0200
GEODE-9484: Improve sending message to multy destinations (#7664)
* GEODE-9484: New solution to first try only one attempt to create all connections
* GEODE-9484: added fix for NPE
---
....java => UpdatePropagationDistributedTest.java} | 107 ++++++++++++++++---
...ava => UpdatePropagationPRDistributedTest.java} | 2 +-
.../geode/internal/tcp/CloseConnectionTest.java | 2 +-
.../geode/internal/tcp/TCPConduitDUnitTest.java | 2 +-
.../distributed/internal/direct/DirectChannel.java | 44 +++++---
.../org/apache/geode/internal/tcp/Connection.java | 6 +-
.../apache/geode/internal/tcp/ConnectionTable.java | 30 ++++--
.../org/apache/geode/internal/tcp/TCPConduit.java | 118 ++++++++++++++++++---
.../internal/tcp/ConnectionTransmissionTest.java | 2 +-
.../apache/geode/internal/tcp/TCPConduitTest.java | 97 ++++++++++++++---
10 files changed, 343 insertions(+), 67 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
similarity index 78%
rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
index 0b99a144e5..055780782f 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
@@ -20,6 +20,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
@@ -50,6 +51,8 @@ import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
@@ -68,53 +71,89 @@ import org.apache.geode.util.internal.GeodeGlossary;
* the same across servers
*/
@Category({ClientSubscriptionTest.class})
-public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
+public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase {
private static final String REGION_NAME = "UpdatePropagationDUnitTest_region";
private VM server1 = null;
private VM server2 = null;
+ private VM server3 = null;
private VM client1 = null;
private VM client2 = null;
private int PORT1;
private int PORT2;
+ private int PORT3;
+
+ private final int minNumEntries = 2;
+
+ private String hostnameServer1;
+ private String hostnameServer3;
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
- // Server1 VM
+
server1 = host.getVM(0);
- // Server2 VM
server2 = host.getVM(1);
- // Client 1 VM
- client1 = host.getVM(2);
+ server3 = host.getVM(2);
- // client 2 VM
- client2 = host.getVM(3);
+ client1 = host.getVM(3);
- PORT1 = server1.invoke(this::createServerCache);
- PORT2 = server2.invoke(this::createServerCache);
+ client2 = host.getVM(4);
- client1.invoke(
- () -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
- client2.invoke(
- () -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
+ PORT1 = server1.invoke(() -> createServerCache());
+ PORT2 = server2.invoke(() -> createServerCache());
+ PORT3 = server3.invoke(() -> createServerCache());
+
+ hostnameServer1 = NetworkUtils.getServerHostName(server1.getHost());
+ hostnameServer3 = NetworkUtils.getServerHostName(server3.getHost());
IgnoredException.addIgnoredException("java.net.SocketException");
IgnoredException.addIgnoredException("Unexpected IOException");
}
+
+
+ @Test
+ public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception {
+ client1.invoke(
+ () -> createClientCache(hostnameServer1, PORT1));
+ client2.invoke(
+ () -> createClientCache(hostnameServer3, PORT3));
+ int entries = 20;
+ AsyncInvocation invocation = client1.invokeAsync(() -> doPuts(entries));
+
+ // Wait for some entries to be put
+ server1.invoke(this::verifyMinEntriesInserted);
+
+ // Simulate crash
+ server2.invoke(() -> {
+ MembershipManagerHelper.crashDistributedSystem(getSystemStatic());
+ });
+
+ invocation.await();
+
+ int notNullEntriesIn1 = client1.invoke(() -> getNotNullEntriesNumber(entries));
+ int notNullEntriesIn3 = client2.invoke(() -> getNotNullEntriesNumber(entries));
+ assertThat(notNullEntriesIn3).isEqualTo(notNullEntriesIn1);
+ }
+
/**
* This tests whether the updates are received by other clients or not , if there are situation of
* Interest List fail over
*/
@Test
public void updatesAreProgegatedAfterFailover() {
+ client1.invoke(
+ () -> createClientCache(hostnameServer1, PORT1, PORT2));
+ client2.invoke(
+ () -> createClientCache(hostnameServer1, PORT1, PORT2));
+
// First create entries on both servers via the two client
client1.invoke(this::createEntriesK1andK2);
client2.invoke(this::createEntriesK1andK2);
@@ -248,6 +287,18 @@ public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
.addCacheListener(new EventTrackingCacheListener()).create(REGION_NAME);
}
+ private void createClientCache(String host, Integer port1) {
+ Properties props = new Properties();
+ props.setProperty(LOCATORS, "");
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(host, port1).setPoolSubscriptionEnabled(false)
+ .setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000)
+ .setPoolReadTimeout(100).setPoolPingInterval(300);
+ ClientCache cache = getClientCache(cf);
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(REGION_NAME);
+ }
+
private Integer createServerCache() throws Exception {
Cache cache = getCache();
RegionAttributes attrs = createCacheServerAttributes();
@@ -305,6 +356,36 @@ public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
});
}
+ private void verifyMinEntriesInserted() {
+ await().untilAsserted(() -> assertThat(getCache().getRegion(SEPARATOR + REGION_NAME))
+ .hasSizeGreaterThan(minNumEntries));
+ }
+
+ private void doPuts(int entries) throws Exception {
+ Region<String, String> r1 = getCache().getRegion(REGION_NAME);
+ assertThat(r1).isNotNull();
+ for (int i = 0; i < entries; i++) {
+ try {
+ r1.put("" + i, "" + i);
+ } catch (Exception e) {
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ private int getNotNullEntriesNumber(int entries) {
+ int notNullEntries = 0;
+ Region<String, String> r1 = getCache().getRegion(SEPARATOR + REGION_NAME);
+ assertThat(r1).isNotNull();
+ for (int i = 0; i < entries; i++) {
+ Object value = r1.get("" + i, "" + i);
+ if (value != null) {
+ notNullEntries++;
+ }
+ }
+ return notNullEntries;
+ }
+
private static class EventTrackingCacheListener extends CacheListenerAdapter {
List<EntryEvent> receivedEvents = new ArrayList<>();
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
similarity index 93%
rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
index 47721ceb2c..77d903ee0e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
@@ -21,7 +21,7 @@ import org.apache.geode.cache.RegionAttributes;
/**
* subclass of UpdatePropagationDUnitTest to exercise partitioned regions
*/
-public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest {
+public class UpdatePropagationPRDistributedTest extends UpdatePropagationDistributedTest {
@Override
protected RegionAttributes createCacheServerAttributes() {
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
index cdb5432399..5aeba3fac2 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
@@ -110,7 +110,7 @@ public class CloseConnectionTest implements Serializable {
InternalDistributedSystem distributedSystem = getCache().getInternalDistributedSystem();
InternalDistributedMember otherMember = distributedSystem.getDistributionManager()
.getOtherNormalDistributionManagerIds().iterator().next();
- Connection connection = conTable.getConduit().getConnection(otherMember, true, false,
+ Connection connection = conTable.getConduit().getConnection(otherMember, true,
System.currentTimeMillis(), 15000, 0);
await().untilAsserted(() -> {
// grab the shared, ordered "sender" connection to vm0. It should have a residual
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
index 41d64c67f6..794d6e093d 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
@@ -110,7 +110,7 @@ public class TCPConduitDUnitTest extends DistributedTestCase {
assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();
Connection sharedUnordered = connectionTable.get(otherMember, false,
- System.currentTimeMillis(), 15000, 0);
+ System.currentTimeMillis(), 15000, 0, false);
sharedUnordered.requestClose("for testing");
// the sender connection has been closed so we should only have 2 senders now
assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index a8a7bb8c20..eaac79f2b8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -281,11 +281,17 @@ public class DirectChannel {
directReply = false;
}
if (ce != null) {
- if (failedCe != null) {
- failedCe.getMembers().addAll(ce.getMembers());
- failedCe.getCauses().addAll(ce.getCauses());
+
+ if (!retry) {
+ retryInfo = ce;
} else {
- failedCe = ce;
+
+ if (failedCe != null) {
+ failedCe.getMembers().addAll(ce.getMembers());
+ failedCe.getCauses().addAll(ce.getCauses());
+ } else {
+ failedCe = ce;
+ }
}
ce = null;
}
@@ -293,6 +299,9 @@ public class DirectChannel {
if (failedCe != null) {
throw failedCe;
}
+ if (retryInfo != null) {
+ continue;
+ }
return bytesWritten;
}
@@ -338,7 +347,12 @@ public class DirectChannel {
}
if (ce != null) {
- retryInfo = ce;
+ if (retryInfo != null) {
+ retryInfo.getMembers().addAll(ce.getMembers());
+ retryInfo.getCauses().addAll(ce.getCauses());
+ } else {
+ retryInfo = ce;
+ }
ce = null;
}
@@ -423,13 +437,13 @@ public class DirectChannel {
* @param retry whether this is a retransmission
* @param ackTimeout the ack warning timeout
* @param ackSDTimeout the ack severe alert timeout
- * @param cons a list to hold the connections
+ * @param connectionsList a list to hold the connections
* @return null if everything went okay, or a ConnectExceptions object if some connections
* couldn't be obtained
*/
private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg,
InternalDistributedMember[] destinations, boolean preserveOrder, boolean retry,
- long ackTimeout, long ackSDTimeout, List cons) {
+ long ackTimeout, long ackSDTimeout, List<Connection> connectionsList) {
ConnectExceptions ce = null;
for (InternalDistributedMember destination : destinations) {
if (destination == null) {
@@ -458,12 +472,18 @@ public class DirectChannel {
if (ackTimeout > 0) {
startTime = System.currentTimeMillis();
}
- Connection con = conduit.getConnection(destination, preserveOrder, retry, startTime,
- ackTimeout, ackSDTimeout);
+ final Connection connection;
+ if (!retry) {
+ connection = conduit.getFirstScanForConnection(destination, preserveOrder, startTime,
+ ackTimeout, ackSDTimeout);
+ } else {
+ connection = conduit.getConnection(destination, preserveOrder, startTime,
+ ackTimeout, ackSDTimeout);
+ }
- con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
- cons.add(con);
- if (con.isSharedResource() && msg instanceof DirectReplyMessage) {
+ connection.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
+ connectionsList.add(connection);
+ if (connection.isSharedResource() && msg instanceof DirectReplyMessage) {
DirectReplyMessage directMessage = (DirectReplyMessage) msg;
directMessage.registerProcessor();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 44205d4d63..9e921d7d03 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -961,7 +961,7 @@ public class Connection implements Runnable {
final ConnectionTable t,
final boolean preserveOrder, final InternalDistributedMember remoteAddr,
final boolean sharedResource,
- final long startTime, final long ackTimeout, final long ackSATimeout)
+ final long startTime, final long ackTimeout, final long ackSATimeout, boolean doNotRetry)
throws IOException, DistributedSystemDisconnectedException {
boolean success = false;
Connection conn = null;
@@ -1021,7 +1021,9 @@ public class Connection implements Runnable {
// do not change the text of this exception - it is looked for in exception handlers
throw new IOException("Cannot form connection to alert listener " + remoteAddr);
}
-
+ if (doNotRetry) {
+ throw new IOException("Connection not created in first try to " + remoteAddr);
+ }
// Wait briefly...
interrupted = Thread.interrupted() || interrupted;
try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index f54f7bd9cd..f1d157d27f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -269,6 +269,7 @@ public class ConnectionTable {
* @param startTime the ms clock start time for the operation
* @param ackThreshold the ms ack-wait-threshold, or zero
* @param ackSAThreshold the ms ack-severe_alert-threshold, or zero
+ * @param doNotRetry whether we should perform reattempt to create connection
* @return the Connection, or null if someone else already created or closed it
* @throws IOException if unable to connect
*/
@@ -276,13 +277,14 @@ public class ConnectionTable {
boolean sharedResource,
boolean preserveOrder, Map<DistributedMember, Object> m, PendingConnection pc, long startTime,
long ackThreshold,
- long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException {
+ long ackSAThreshold, boolean doNotRetry)
+ throws IOException, DistributedSystemDisconnectedException {
// handle new pending connection
Connection con = null;
try {
long senderCreateStartTime = owner.getStats().startSenderCreate();
con = Connection.createSender(owner.getMembership(), this, preserveOrder, id,
- sharedResource, startTime, ackThreshold, ackSAThreshold);
+ sharedResource, startTime, ackThreshold, ackSAThreshold, doNotRetry);
owner.getStats().incSenders(sharedResource, preserveOrder, senderCreateStartTime);
} finally {
// our connection failed to notify anyone waiting for our pending con
@@ -350,11 +352,14 @@ public class ConnectionTable {
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
+ * @param doNotRetryWaitForConnection whether we should perform reattempt (or wait) to create
+ * connection
* @return the new Connection, or null if an error
* @throws IOException if unable to create the connection
*/
private Connection getSharedConnection(InternalDistributedMember id, boolean scheduleTimeout,
- boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
+ boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout,
+ boolean doNotRetryWaitForConnection)
throws IOException, DistributedSystemDisconnectedException {
final Map<DistributedMember, Object> m =
@@ -387,7 +392,7 @@ public class ConnectionTable {
logger.debug("created PendingConnection {}", pc);
}
result = handleNewPendingConnection(id, true, preserveOrder, m, pc,
- startTime, ackTimeout, ackSATimeout);
+ startTime, ackTimeout, ackSATimeout, doNotRetryWaitForConnection);
if (!preserveOrder && scheduleTimeout) {
scheduleIdleTimeout(result);
}
@@ -400,6 +405,10 @@ public class ConnectionTable {
throw new IOException("Cannot form connection to alert listener " + id);
}
+ if (doNotRetryWaitForConnection) {
+ return null;
+ }
+
result = ((PendingConnection) mEntry).waitForConnect(owner.getMembership(),
startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
@@ -425,11 +434,13 @@ public class ConnectionTable {
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
+ * @param doNotRetry whether we should perform reattempt to create connection
* @return the connection, or null if an error
* @throws IOException if the connection could not be created
*/
Connection getThreadOwnedConnection(InternalDistributedMember id, long startTime, long ackTimeout,
- long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+ long ackSATimeout, boolean doNotRetry)
+ throws IOException, DistributedSystemDisconnectedException {
Connection result;
// Look for result in the thread local
@@ -449,7 +460,7 @@ public class ConnectionTable {
// OK, we have to create a new connection.
long senderCreateStartTime = owner.getStats().startSenderCreate();
result = Connection.createSender(owner.getMembership(), this, true, id, false, startTime,
- ackTimeout, ackSATimeout);
+ ackTimeout, ackSATimeout, doNotRetry);
owner.getStats().incSenders(false, true, senderCreateStartTime);
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}", result);
@@ -521,11 +532,12 @@ public class ConnectionTable {
* @param startTime the ms clock start time
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
+ * @param doNotRetry whether we should perform reattempt to create connection
* @return the new Connection, or null if a problem
* @throws IOException if the connection could not be created
*/
protected Connection get(InternalDistributedMember id, boolean preserveOrder, long startTime,
- long ackTimeout, long ackSATimeout)
+ long ackTimeout, long ackSATimeout, boolean doNotRetry)
throws IOException, DistributedSystemDisconnectedException {
if (closed) {
owner.getCancelCriterion().checkCancelInProgress(null);
@@ -535,9 +547,9 @@ public class ConnectionTable {
boolean threadOwnsResources = threadOwnsResources();
if (!preserveOrder || !threadOwnsResources) {
result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
- ackSATimeout);
+ ackSATimeout, doNotRetry);
} else {
- result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout);
+ result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout, doNotRetry);
}
if (result != null) {
Assert.assertTrue(result.getPreserveOrder() == preserveOrder);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 4d6d9c8216..843b49c25f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -719,7 +719,6 @@ public class TCPConduit implements Runnable {
*
* @param memberAddress the IDS associated with the remoteId
* @param preserveOrder whether this is an ordered or unordered connection
- * @param retry false if this is the first attempt
* @param startTime the time this operation started
* @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted
@@ -728,7 +727,7 @@ public class TCPConduit implements Runnable {
* @return the connection
*/
public Connection getConnection(InternalDistributedMember memberAddress,
- final boolean preserveOrder, boolean retry, long startTime, long ackTimeout,
+ final boolean preserveOrder, long startTime, long ackTimeout,
long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
if (stopped) {
throw new DistributedSystemDisconnectedException("The conduit is stopped");
@@ -742,7 +741,7 @@ public class TCPConduit implements Runnable {
try {
// If this is the second time through this loop, we had problems.
// Tear down the connection so that it gets rebuilt.
- if (retry || conn != null) { // not first time in loop
+ if (conn != null) { // not first time in loop
if (!membership.memberExists(memberAddress)
|| membership.isShunned(memberAddress)
|| membership.shutdownInProgress()) {
@@ -777,18 +776,15 @@ public class TCPConduit implements Runnable {
// Close the connection (it will get rebuilt later).
getStats().incReconnectAttempts();
- if (conn != null) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}",
- conn, memberInTrouble);
- }
- conn.closeForReconnect("closing before retrying");
- } catch (CancelException ex) {
- throw ex;
- } catch (Exception ex) {
- // ignored
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}",
+ conn, memberInTrouble);
}
+ conn.closeForReconnect("closing before retrying");
+ } catch (CancelException ex) {
+ throw ex;
+ } catch (Exception ignored) {
}
} // not first time in loop
@@ -801,7 +797,7 @@ public class TCPConduit implements Runnable {
do {
retryForOldConnection = false;
conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout,
- ackSATimeout);
+ ackSATimeout, false);
if (conn == null) {
// conduit may be closed - otherwise an ioexception would be thrown
problem = new IOException(
@@ -909,6 +905,98 @@ public class TCPConduit implements Runnable {
}
}
+ /**
+ * Return a connection to the given member. This method performs quick scan for connection.
+ * Only one attempt to create a connection to the given member .
+ *
+ * @param memberAddress the IDS associated with the remoteId
+ * @param preserveOrder whether this is an ordered or unordered connection
+ * @param startTime the time this operation started
+ * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
+ * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted
+ * (or zero)
+ *
+ * @return the connection
+ */
+ public Connection getFirstScanForConnection(InternalDistributedMember memberAddress,
+ final boolean preserveOrder, long startTime, long ackTimeout,
+ long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+ if (stopped) {
+ throw new DistributedSystemDisconnectedException("The conduit is stopped");
+ }
+
+ Connection connection = null;
+ stopper.checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+
+ Exception problem = null;
+ try {
+ connection = getConnectionThatIsNotClosed(memberAddress, preserveOrder, startTime,
+ ackTimeout, ackSATimeout);
+
+ // we have a connection; fall through and return it
+ } catch (ConnectionException e) {
+ // Race condition between acquiring the connection and attempting
+ // to use it: another thread closed it.
+ problem = e;
+ // No need to retry since Connection.createSender has already
+ // done retries and now member is really unreachable for some reason
+ // even though it may be in the view
+ } catch (IOException e) {
+ problem = e;
+ // don't keep trying to connect to an alert listener
+ if (AlertingAction.isThreadAlerting()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Giving up connecting to alert listener {}", memberAddress);
+ }
+ }
+ }
+
+ if (problem != null) {
+ if (problem instanceof IOException) {
+ if (problem.getMessage() != null
+ && problem.getMessage().startsWith("Cannot form connection to alert listener")) {
+ throw new AlertingIOException((IOException) problem);
+ }
+ throw (IOException) problem;
+ }
+ throw new IOException(
+ String.format("Problem connecting to %s", memberAddress), problem);
+ }
+ // Success!
+
+ return connection;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private Connection getConnectionThatIsNotClosed(InternalDistributedMember memberAddress,
+ final boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
+ throws IOException, ConnectionException {
+ boolean debugEnabled = logger.isDebugEnabled();
+ Connection connection;
+ while (true) {
+ connection = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout,
+ ackSATimeout, true);
+ if (connection == null) {
+ throw new IOException("Unable to reconnect to server; possible shutdown: " + memberAddress);
+ }
+
+ if (!connection.isClosing() && connection.getRemoteAddress().equals(memberAddress)) {
+ return connection;
+ }
+ if (debugEnabled) {
+ logger.debug("Got an old connection for {}: {}@{}", memberAddress, connection,
+ connection.hashCode());
+ }
+ connection.closeOldConnection("closing old connection");
+ }
+ }
+
@Override
public String toString() {
return String.valueOf(id);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
index 5a041eb167..906a021dec 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
@@ -168,7 +168,7 @@ public class ConnectionTransmissionTest {
senderAddr.setDirectChannelPort(conduit.getPort());
return spy(Connection.createSender(membership, writerTable, true, remoteAddr, true,
- System.currentTimeMillis(), 1000, 1000));
+ System.currentTimeMillis(), 1000, 1000, false));
}
private Connection createReceiverConnectionOnFirstAccept(final ServerSocketChannel acceptorSocket,
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
index 392a5993c0..e1b3ddf49e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
@@ -94,7 +94,8 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong());
+ .when(connectionTable)
+ .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
when(membership.memberExists(eq(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
@@ -102,7 +103,7 @@ public class TCPConduitTest {
AlertingAction.execute(() -> {
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -123,13 +124,14 @@ public class TCPConduitTest {
doThrow(new IOException("Cannot form connection to alert listener"))
// getConnection will loop indefinitely until connectionTable returns connection
.doReturn(connection)
- .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong());
+ .when(connectionTable)
+ .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
when(membership.memberExists(eq(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(false);
- Connection value = tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+ Connection value = tcpConduit.getConnection(member, false, 0L, 0L, 0L);
assertThat(value)
.isSameAs(connection);
@@ -143,12 +145,13 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong());
+ .when(connectionTable)
+ .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
when(membership.memberExists(eq(member)))
.thenReturn(false);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -164,14 +167,15 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong());
+ .when(connectionTable)
+ .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -188,7 +192,8 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong());
+ .when(connectionTable)
+ .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
@@ -197,7 +202,7 @@ public class TCPConduitTest {
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -214,7 +219,8 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong());
+ .when(connectionTable)
+ .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
@@ -223,7 +229,7 @@ public class TCPConduitTest {
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -231,6 +237,73 @@ public class TCPConduitTest {
.hasMessage("Abandoned because shutdown is in progress");
}
+ @Test
+ public void getFirstScanForConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting()
+ throws Exception {
+ TCPConduit tcpConduit =
+ new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
+ new Properties(),
+ TCPConduit -> connectionTable, socketCreator, doNothing(), false);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ doThrow(new IOException("Cannot form connection to alert listener"))
+ .when(connectionTable)
+ .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
+
+ AlertingAction.execute(() -> {
+ Throwable thrown = catchThrowable(() -> {
+ tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
+ });
+
+ assertThat(thrown)
+ .isInstanceOf(AlertingIOException.class);
+ });
+ }
+
+ @Test
+ public void getFirstScanForConnectionRethrows_ifCaughtIOException_whileNotAlerting()
+ throws Exception {
+ TCPConduit tcpConduit =
+ new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
+ new Properties(),
+ TCPConduit -> connectionTable, socketCreator, doNothing(), false);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ Connection connection = mock(Connection.class);
+ doThrow(new IOException("Connection not created in first try"))
+ .doReturn(connection)
+ .when(connectionTable)
+ .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
+
+ Throwable thrown = catchThrowable(() -> {
+ tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
+ });
+
+ assertThat(thrown)
+ .isInstanceOf(IOException.class);
+ }
+
+
+ @Test
+ public void getFirstScanForConnectionRethrows_ifCaughtIOException_whithoutMessage()
+ throws Exception {
+ TCPConduit tcpConduit =
+ new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class),
+ new Properties(),
+ TCPConduit -> connectionTable, socketCreator, doNothing(), false);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ Connection connection = mock(Connection.class);
+ doThrow(new IOException())
+ .doReturn(connection)
+ .when(connectionTable)
+ .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean());
+
+ Throwable thrown = catchThrowable(() -> {
+ tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
+ });
+
+ assertThat(thrown)
+ .isInstanceOf(IOException.class);
+ }
+
private Runnable doNothing() {
return () -> {
// nothing