You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/01/04 21:46:53 UTC

[geode] branch develop updated: GEODE-4096: Fixed race condition for connection global variable

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6c37ff4  GEODE-4096: Fixed race condition for connection global variable
6c37ff4 is described below

commit 6c37ff4a2a4fc6d609626b172793e3a123f9be82
Author: nabarun <nn...@pivotal.io>
AuthorDate: Wed Dec 13 16:40:49 2017 -0800

    GEODE-4096: Fixed race condition for connection global variable
    
    	* Information on how the race condition occurs is provided in the GEODE-4096 ticket.
    	* getConnection before returning null and clearing out the global variable connection calls stop on the dispatcher.
    	* This makes sure that AckReaderThreads for the dispatcher is shutdown and prevents lingering threads holding the connection life cycle lock.
---
 .../wan/GatewaySenderEventRemoteDispatcher.java    | 27 ++++++++++---
 ...atewaySenderEventRemoteDispatcherJUnitTest.java | 45 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 5 deletions(-)

diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3a15342..782d7c0 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -48,7 +48,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
 
   private static final Logger logger = LogService.getLogger();
 
-  private final AbstractGatewaySenderEventProcessor processor;
+  protected final AbstractGatewaySenderEventProcessor processor;
 
   private volatile Connection connection;
 
@@ -67,6 +67,10 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    */
   private int failedConnectCount = 0;
 
+  void setAckReaderThread(AckReaderThread ackReaderThread) {
+    this.ackReaderThread = ackReaderThread;
+  }
+
   public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
     this.processor = eventProcessor;
     this.sender = eventProcessor.getSender();
@@ -77,9 +81,17 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       if (e.getCause() instanceof GemFireSecurityException) {
         throw e;
       }
+
     }
   }
 
+  GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor processor,
+      Connection connection) {
+    this.processor = processor;
+    this.sender = processor.getSender();
+    this.connection = connection;
+  }
+
   protected GatewayAck readAcknowledgement() {
     SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
     GatewayAck ack = null;
@@ -299,6 +311,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    */
   public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException {
     if (this.processor.isStopped()) {
+      stop();
       return null;
     }
     // IF the connection is null
@@ -364,7 +377,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    */
   private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
     if (ackReaderThread != null) {
-      ackReaderThread.shutDownAckReaderConnection();
+      ackReaderThread.shutDownAckReaderConnection(connection);
     }
     this.connectionLifeCycleLock.writeLock().lock();
     try {
@@ -560,6 +573,10 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       this(sender, processor.getName());
     }
 
+    boolean isShutdown() {
+      return shutdown;
+    }
+
     public AckReaderThread(GatewaySender sender, String name) {
       super("AckReaderThread for : " + name);
       this.setDaemon(true);
@@ -751,7 +768,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       // get chance to destroy unless that returns.
       Connection conn = connection;
       if (conn != null) {
-        shutDownAckReaderConnection();
+        shutDownAckReaderConnection(conn);
         if (!conn.isDestroyed()) {
           conn.destroy();
           sender.getProxy().returnConnection(conn);
@@ -774,7 +791,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       }
     }
 
-    private void shutDownAckReaderConnection() {
+    private void shutDownAckReaderConnection(Connection connection) {
       Connection conn = connection;
       // attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck
       // on a read
@@ -808,7 +825,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
 
   public void shutDownAckReaderConnection() {
     if (ackReaderThread != null) {
-      ackReaderThread.shutDownAckReaderConnection();
+      ackReaderThread.shutDownAckReaderConnection(connection);
     }
   }
 
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
new file mode 100644
index 0000000..6480d85
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class GatewaySenderEventRemoteDispatcherJUnitTest {
+  @Test
+  public void getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() {
+    AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+    AbstractGatewaySenderEventProcessor eventProcessor =
+        mock(AbstractGatewaySenderEventProcessor.class);
+    GatewaySenderEventRemoteDispatcher dispatcher =
+        new GatewaySenderEventRemoteDispatcher(eventProcessor, null);
+    GatewaySenderEventRemoteDispatcher.AckReaderThread ackReaderThread =
+        dispatcher.new AckReaderThread(sender, "AckReaderThread");
+    dispatcher.setAckReaderThread(ackReaderThread);
+    assertFalse(ackReaderThread.isShutdown());
+    when(eventProcessor.isStopped()).thenReturn(true);
+    assertNull(dispatcher.getConnection(false));
+    assertTrue(ackReaderThread.isShutdown());
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].