You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/01/04 21:46:53 UTC
[geode] branch develop updated: GEODE-4096: Fixed race condition
for connection global variable
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6c37ff4 GEODE-4096: Fixed race condition for connection global variable
6c37ff4 is described below
commit 6c37ff4a2a4fc6d609626b172793e3a123f9be82
Author: nabarun <nn...@pivotal.io>
AuthorDate: Wed Dec 13 16:40:49 2017 -0800
GEODE-4096: Fixed race condition for connection global variable
* Information on how the race condition occurs is provided in the GEODE-4096 ticket.
* getConnection before returning null and clearing out the global variable connection calls stop on the dispatcher.
* This makes sure that AckReaderThreads for the dispatcher is shutdown and prevents lingering threads holding the connection life cycle lock.
---
.../wan/GatewaySenderEventRemoteDispatcher.java | 27 ++++++++++---
...atewaySenderEventRemoteDispatcherJUnitTest.java | 45 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 5 deletions(-)
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3a15342..782d7c0 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -48,7 +48,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
private static final Logger logger = LogService.getLogger();
- private final AbstractGatewaySenderEventProcessor processor;
+ protected final AbstractGatewaySenderEventProcessor processor;
private volatile Connection connection;
@@ -67,6 +67,10 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
*/
private int failedConnectCount = 0;
+ void setAckReaderThread(AckReaderThread ackReaderThread) {
+ this.ackReaderThread = ackReaderThread;
+ }
+
public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
this.processor = eventProcessor;
this.sender = eventProcessor.getSender();
@@ -77,9 +81,17 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
if (e.getCause() instanceof GemFireSecurityException) {
throw e;
}
+
}
}
+ GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor processor,
+ Connection connection) {
+ this.processor = processor;
+ this.sender = processor.getSender();
+ this.connection = connection;
+ }
+
protected GatewayAck readAcknowledgement() {
SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
GatewayAck ack = null;
@@ -299,6 +311,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
*/
public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException {
if (this.processor.isStopped()) {
+ stop();
return null;
}
// IF the connection is null
@@ -364,7 +377,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
*/
private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
if (ackReaderThread != null) {
- ackReaderThread.shutDownAckReaderConnection();
+ ackReaderThread.shutDownAckReaderConnection(connection);
}
this.connectionLifeCycleLock.writeLock().lock();
try {
@@ -560,6 +573,10 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
this(sender, processor.getName());
}
+ boolean isShutdown() {
+ return shutdown;
+ }
+
public AckReaderThread(GatewaySender sender, String name) {
super("AckReaderThread for : " + name);
this.setDaemon(true);
@@ -751,7 +768,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
// get chance to destroy unless that returns.
Connection conn = connection;
if (conn != null) {
- shutDownAckReaderConnection();
+ shutDownAckReaderConnection(conn);
if (!conn.isDestroyed()) {
conn.destroy();
sender.getProxy().returnConnection(conn);
@@ -774,7 +791,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
}
- private void shutDownAckReaderConnection() {
+ private void shutDownAckReaderConnection(Connection connection) {
Connection conn = connection;
// attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck
// on a read
@@ -808,7 +825,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
public void shutDownAckReaderConnection() {
if (ackReaderThread != null) {
- ackReaderThread.shutDownAckReaderConnection();
+ ackReaderThread.shutDownAckReaderConnection(connection);
}
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
new file mode 100644
index 0000000..6480d85
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class GatewaySenderEventRemoteDispatcherJUnitTest {
+ @Test
+ public void getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() {
+ AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+ AbstractGatewaySenderEventProcessor eventProcessor =
+ mock(AbstractGatewaySenderEventProcessor.class);
+ GatewaySenderEventRemoteDispatcher dispatcher =
+ new GatewaySenderEventRemoteDispatcher(eventProcessor, null);
+ GatewaySenderEventRemoteDispatcher.AckReaderThread ackReaderThread =
+ dispatcher.new AckReaderThread(sender, "AckReaderThread");
+ dispatcher.setAckReaderThread(ackReaderThread);
+ assertFalse(ackReaderThread.isShutdown());
+ when(eventProcessor.isStopped()).thenReturn(true);
+ assertNull(dispatcher.getConnection(false));
+ assertTrue(ackReaderThread.isShutdown());
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].