You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/12 22:36:26 UTC

[23/46] geode git commit: GEODE-2133: Changed AckReaderThread to not shutdown while logging batch exceptions

GEODE-2133: Changed AckReaderThread to not shutdown while logging batch exceptions


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/bb419793
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/bb419793
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/bb419793

Branch: refs/heads/feature/GEODE-1930
Commit: bb419793a4ff199c700ef0f327040bee05753d08
Parents: 213cf04
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Fri Nov 18 16:17:13 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Sun Nov 20 17:01:08 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/i18n/LocalizedStrings.java   |  6 +-
 .../client/internal/GatewaySenderBatchOp.java   |  9 +-
 .../wan/GatewaySenderEventRemoteDispatcher.java | 93 +++++++++++---------
 .../cache/wan/AckReaderThreadJUnitTest.java     | 83 +++++++++++++++++
 4 files changed, 145 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 7638cb3..fc59922 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -7201,7 +7201,8 @@ public class LocalizedStrings {
       "Hostname is unknown: {0}. Creating pool with unknown host in case the host becomes known later.");
 
   public static final StringId GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION =
-      new StringId(5302, "Gateway Sender {0} : Received ack for batch id {1} with exception:");
+      new StringId(5302,
+          "Gateway Sender {0} : Received ack for batch id {1} with one or more exceptions");
 
   public static final StringId Region_REGION_0_HAS_1_GATEWAY_SENDER_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_GATEWAY_SENDER_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_GATEWAY_SENDER_IDS_SHOULD_BE_SAME =
       new StringId(5303,
@@ -7653,6 +7654,9 @@ public class LocalizedStrings {
   public static StringId GEMFIRE_CACHE_SECURITY_MISCONFIGURATION_2 =
       new StringId(6645, "A server must use cluster configuration when joining a secured cluster.");
 
+  public static final StringId GatewayEventRemoteDispatcher_AN_EXCEPTION_OCCURRED_PROCESSING_A_BATCHEXCEPTION__0 =
+      new StringId(6646,
+          "An unexpected exception occurred processing a BatchException. The thread will continue.");
 
   /** Testing strings, messageId 90000-99999 **/
 

http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
index 5ed8454..7fc762fe6 100755
--- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
@@ -257,10 +257,11 @@ public class GatewaySenderBatchOp {
             if (obj instanceof List) {
               List<BatchException70> l = (List<BatchException70>) part0.getObject();
 
-              // if (logger.isDebugEnabled()) {
-              logger.info("We got an exception from the GatewayReceiver. MessageType : {} obj :{}",
-                  msg.getMessageType(), obj);
-              // }
+              if (logger.isDebugEnabled()) {
+                logger.info(
+                    "We got an exception from the GatewayReceiver. MessageType : {} obj :{}",
+                    msg.getMessageType(), obj);
+              }
               // don't throw Exception but set it in the Ack
               BatchException70 be = new BatchException70(l);
               ack = new GatewayAck(be, l.get(0).getBatchId());

http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 6f5078b..16b1965 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -556,7 +556,11 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
     private volatile boolean ackReaderThreadRunning = false;
 
     public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
-      super("AckReaderThread for : " + processor.getName());
+      this(sender, processor.getName());
+    }
+
+    public AckReaderThread(GatewaySender sender, String name) {
+      super("AckReaderThread for : " + name);
       this.setDaemon(true);
       this.cache = (GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache();
     }
@@ -610,9 +614,9 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
             // If the batch is successfully processed, remove it from the
             // queue.
             if (gotBatchException) {
-              logger.info(LocalizedMessage.create(
+              logger.warn(LocalizedMessage.create(
                   LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION,
-                  new Object[] {processor.getSender(), ack.getBatchId()}, ack.getBatchException()));
+                  new Object[] {processor.getSender(), ack.getBatchId()}));
               // If we get PDX related exception in the batch exception then try
               // to resend all the pdx events as well in the next batch.
               final GatewaySenderStats statistics = sender.getStatistics();
@@ -674,57 +678,64 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
      * @param exception
      * 
      */
-    private void logBatchExceptions(BatchException70 exception) {
-      for (BatchException70 be : exception.getExceptions()) {
-        boolean logWarning = true;
-        if (be.getCause() instanceof RegionDestroyedException) {
-          RegionDestroyedException rde = (RegionDestroyedException) be.getCause();
-          synchronized (notFoundRegionsSync) {
-            if (notFoundRegions.contains(rde.getRegionFullPath())) {
-              logWarning = false;
-            } else {
-              notFoundRegions.add(rde.getRegionFullPath());
+    protected void logBatchExceptions(BatchException70 exception) {
+      try {
+        for (BatchException70 be : exception.getExceptions()) {
+          boolean logWarning = true;
+          if (be.getCause() instanceof RegionDestroyedException) {
+            RegionDestroyedException rde = (RegionDestroyedException) be.getCause();
+            synchronized (notFoundRegionsSync) {
+              if (notFoundRegions.contains(rde.getRegionFullPath())) {
+                logWarning = false;
+              } else {
+                notFoundRegions.add(rde.getRegionFullPath());
+              }
             }
+          } else if (be.getCause() instanceof IllegalStateException
+              && be.getCause().getMessage().contains("Unknown pdx type")) {
+            List<GatewaySenderEventImpl> pdxEvents =
+                processor.getBatchIdToPDXEventsMap().get(be.getBatchId());
+            if (logWarning) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
+                  be.getIndex()), be);
+            }
+            if (pdxEvents != null) {
+              for (GatewaySenderEventImpl senderEvent : pdxEvents) {
+                senderEvent.isAcked = false;
+              }
+              GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
+              if (logWarning) {
+                logger.warn(LocalizedMessage.create(
+                    LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0,
+                    gsEvent));
+              }
+            }
+            continue;
           }
-        } else if (be.getCause() instanceof IllegalStateException
-            && be.getCause().getMessage().contains("Unknown pdx type")) {
-          List<GatewaySenderEventImpl> pdxEvents =
-              processor.getBatchIdToPDXEventsMap().get(be.getBatchId());
           if (logWarning) {
             logger.warn(LocalizedMessage.create(
-                LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
+                LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
                 be.getIndex()), be);
           }
-          if (pdxEvents != null) {
-            for (GatewaySenderEventImpl senderEvent : pdxEvents) {
-              senderEvent.isAcked = false;
-            }
-            GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
+          List<GatewaySenderEventImpl>[] eventsArr =
+              processor.getBatchIdToEventsMap().get(be.getBatchId());
+          if (eventsArr != null) {
+            List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
+            GatewaySenderEventImpl gsEvent =
+                (GatewaySenderEventImpl) filteredEvents.get(be.getIndex());
             if (logWarning) {
               logger.warn(LocalizedMessage.create(
                   LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0,
                   gsEvent));
             }
           }
-          continue;
-        }
-        if (logWarning) {
-          logger.warn(LocalizedMessage.create(
-              LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
-              be.getIndex()), be);
-        }
-        List<GatewaySenderEventImpl>[] eventsArr =
-            processor.getBatchIdToEventsMap().get(be.getBatchId());
-        if (eventsArr != null) {
-          List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
-          GatewaySenderEventImpl gsEvent =
-              (GatewaySenderEventImpl) filteredEvents.get(be.getIndex());
-          if (logWarning) {
-            logger.warn(LocalizedMessage.create(
-                LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0,
-                gsEvent));
-          }
         }
+      } catch (Exception e) {
+        logger.warn(
+            LocalizedMessage.create(
+                LocalizedStrings.GatewayEventRemoteDispatcher_AN_EXCEPTION_OCCURRED_PROCESSING_A_BATCHEXCEPTION__0),
+            e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java
new file mode 100644
index 0000000..1b1ed0a
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(UnitTest.class)
+public class AckReaderThreadJUnitTest {
+
+  private GemFireCacheImpl cache;
+  private AbstractGatewaySender sender;
+  private GatewaySenderEventRemoteDispatcher dispatcher;
+
+  @Before
+  public void setUpGemFire() {
+    createCache();
+    createSender();
+    createDispatcher();
+  }
+
+  private void createCache() {
+    // Mock cache
+    this.cache = Fakes.cache();
+    GemFireCacheImpl.setInstanceForTests(this.cache);
+  }
+
+  private void createSender() {
+    // Mock gateway sender
+    this.sender = mock(AbstractGatewaySender.class);
+    when(this.sender.getCache()).thenReturn(this.cache);
+  }
+
+  private void createDispatcher() {
+    this.dispatcher = mock(GatewaySenderEventRemoteDispatcher.class);
+  }
+
+  @After
+  public void tearDownGemFire() {
+    GemFireCacheImpl.setInstanceForTests(null);
+  }
+
+  @Test
+  public void testLogBatchExceptions() throws Exception {
+    // Create AckReaderThread
+    GatewaySenderEventRemoteDispatcher.AckReaderThread thread =
+        this.dispatcher.new AckReaderThread(this.sender, "AckReaderThread");
+
+    // Create parent BatchException containing a NullPointerException with no index
+    List<BatchException70> batchExceptions = new ArrayList();
+    batchExceptions
+        .add(new BatchException70("null pointer exception", new NullPointerException(), -1, 0));
+    BatchException70 batchException = new BatchException70(batchExceptions);
+
+    // Attempt to handle the parent BatchException. If this method fails, an Exception will be
+    // thrown, and this test will fail. If it succeeds, there won't be an exception, and the test
+    // will fall through.
+    thread.logBatchExceptions(batchException);
+  }
+}