You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/12 22:36:26 UTC
[23/46] geode git commit: GEODE-2133: Changed AckReaderThread to not
shutdown while logging batch exceptions
GEODE-2133: Changed AckReaderThread to not shutdown while logging batch exceptions
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/bb419793
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/bb419793
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/bb419793
Branch: refs/heads/feature/GEODE-1930
Commit: bb419793a4ff199c700ef0f327040bee05753d08
Parents: 213cf04
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Fri Nov 18 16:17:13 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Sun Nov 20 17:01:08 2016 -0800
----------------------------------------------------------------------
.../geode/internal/i18n/LocalizedStrings.java | 6 +-
.../client/internal/GatewaySenderBatchOp.java | 9 +-
.../wan/GatewaySenderEventRemoteDispatcher.java | 93 +++++++++++---------
.../cache/wan/AckReaderThreadJUnitTest.java | 83 +++++++++++++++++
4 files changed, 145 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 7638cb3..fc59922 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -7201,7 +7201,8 @@ public class LocalizedStrings {
"Hostname is unknown: {0}. Creating pool with unknown host in case the host becomes known later.");
public static final StringId GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION =
- new StringId(5302, "Gateway Sender {0} : Received ack for batch id {1} with exception:");
+ new StringId(5302,
+ "Gateway Sender {0} : Received ack for batch id {1} with one or more exceptions");
public static final StringId Region_REGION_0_HAS_1_GATEWAY_SENDER_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_GATEWAY_SENDER_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_GATEWAY_SENDER_IDS_SHOULD_BE_SAME =
new StringId(5303,
@@ -7653,6 +7654,9 @@ public class LocalizedStrings {
public static StringId GEMFIRE_CACHE_SECURITY_MISCONFIGURATION_2 =
new StringId(6645, "A server must use cluster configuration when joining a secured cluster.");
+ public static final StringId GatewayEventRemoteDispatcher_AN_EXCEPTION_OCCURRED_PROCESSING_A_BATCHEXCEPTION__0 =
+ new StringId(6646,
+ "An unexpected exception occurred processing a BatchException. The thread will continue.");
/** Testing strings, messageId 90000-99999 **/
http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
index 5ed8454..7fc762fe6 100755
--- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
@@ -257,10 +257,11 @@ public class GatewaySenderBatchOp {
if (obj instanceof List) {
List<BatchException70> l = (List<BatchException70>) part0.getObject();
- // if (logger.isDebugEnabled()) {
- logger.info("We got an exception from the GatewayReceiver. MessageType : {} obj :{}",
- msg.getMessageType(), obj);
- // }
+ if (logger.isDebugEnabled()) {
+ logger.info(
+ "We got an exception from the GatewayReceiver. MessageType : {} obj :{}",
+ msg.getMessageType(), obj);
+ }
// don't throw Exception but set it in the Ack
BatchException70 be = new BatchException70(l);
ack = new GatewayAck(be, l.get(0).getBatchId());
http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 6f5078b..16b1965 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -556,7 +556,11 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
private volatile boolean ackReaderThreadRunning = false;
public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
- super("AckReaderThread for : " + processor.getName());
+ this(sender, processor.getName());
+ }
+
+ public AckReaderThread(GatewaySender sender, String name) {
+ super("AckReaderThread for : " + name);
this.setDaemon(true);
this.cache = (GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache();
}
@@ -610,9 +614,9 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
// If the batch is successfully processed, remove it from the
// queue.
if (gotBatchException) {
- logger.info(LocalizedMessage.create(
+ logger.warn(LocalizedMessage.create(
LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION,
- new Object[] {processor.getSender(), ack.getBatchId()}, ack.getBatchException()));
+ new Object[] {processor.getSender(), ack.getBatchId()}));
// If we get PDX related exception in the batch exception then try
// to resend all the pdx events as well in the next batch.
final GatewaySenderStats statistics = sender.getStatistics();
@@ -674,57 +678,64 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
* @param exception
*
*/
- private void logBatchExceptions(BatchException70 exception) {
- for (BatchException70 be : exception.getExceptions()) {
- boolean logWarning = true;
- if (be.getCause() instanceof RegionDestroyedException) {
- RegionDestroyedException rde = (RegionDestroyedException) be.getCause();
- synchronized (notFoundRegionsSync) {
- if (notFoundRegions.contains(rde.getRegionFullPath())) {
- logWarning = false;
- } else {
- notFoundRegions.add(rde.getRegionFullPath());
+ protected void logBatchExceptions(BatchException70 exception) {
+ try {
+ for (BatchException70 be : exception.getExceptions()) {
+ boolean logWarning = true;
+ if (be.getCause() instanceof RegionDestroyedException) {
+ RegionDestroyedException rde = (RegionDestroyedException) be.getCause();
+ synchronized (notFoundRegionsSync) {
+ if (notFoundRegions.contains(rde.getRegionFullPath())) {
+ logWarning = false;
+ } else {
+ notFoundRegions.add(rde.getRegionFullPath());
+ }
}
+ } else if (be.getCause() instanceof IllegalStateException
+ && be.getCause().getMessage().contains("Unknown pdx type")) {
+ List<GatewaySenderEventImpl> pdxEvents =
+ processor.getBatchIdToPDXEventsMap().get(be.getBatchId());
+ if (logWarning) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
+ be.getIndex()), be);
+ }
+ if (pdxEvents != null) {
+ for (GatewaySenderEventImpl senderEvent : pdxEvents) {
+ senderEvent.isAcked = false;
+ }
+ GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
+ if (logWarning) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0,
+ gsEvent));
+ }
+ }
+ continue;
}
- } else if (be.getCause() instanceof IllegalStateException
- && be.getCause().getMessage().contains("Unknown pdx type")) {
- List<GatewaySenderEventImpl> pdxEvents =
- processor.getBatchIdToPDXEventsMap().get(be.getBatchId());
if (logWarning) {
logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
+ LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
be.getIndex()), be);
}
- if (pdxEvents != null) {
- for (GatewaySenderEventImpl senderEvent : pdxEvents) {
- senderEvent.isAcked = false;
- }
- GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
+ List<GatewaySenderEventImpl>[] eventsArr =
+ processor.getBatchIdToEventsMap().get(be.getBatchId());
+ if (eventsArr != null) {
+ List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
+ GatewaySenderEventImpl gsEvent =
+ (GatewaySenderEventImpl) filteredEvents.get(be.getIndex());
if (logWarning) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0,
gsEvent));
}
}
- continue;
- }
- if (logWarning) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
- be.getIndex()), be);
- }
- List<GatewaySenderEventImpl>[] eventsArr =
- processor.getBatchIdToEventsMap().get(be.getBatchId());
- if (eventsArr != null) {
- List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
- GatewaySenderEventImpl gsEvent =
- (GatewaySenderEventImpl) filteredEvents.get(be.getIndex());
- if (logWarning) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0,
- gsEvent));
- }
}
+ } catch (Exception e) {
+ logger.warn(
+ LocalizedMessage.create(
+ LocalizedStrings.GatewayEventRemoteDispatcher_AN_EXCEPTION_OCCURRED_PROCESSING_A_BATCHEXCEPTION__0),
+ e);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/bb419793/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java
new file mode 100644
index 0000000..1b1ed0a
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AckReaderThreadJUnitTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(UnitTest.class)
+public class AckReaderThreadJUnitTest {
+
+ private GemFireCacheImpl cache;
+ private AbstractGatewaySender sender;
+ private GatewaySenderEventRemoteDispatcher dispatcher;
+
+ @Before
+ public void setUpGemFire() {
+ createCache();
+ createSender();
+ createDispatcher();
+ }
+
+ private void createCache() {
+ // Mock cache
+ this.cache = Fakes.cache();
+ GemFireCacheImpl.setInstanceForTests(this.cache);
+ }
+
+ private void createSender() {
+ // Mock gateway sender
+ this.sender = mock(AbstractGatewaySender.class);
+ when(this.sender.getCache()).thenReturn(this.cache);
+ }
+
+ private void createDispatcher() {
+ this.dispatcher = mock(GatewaySenderEventRemoteDispatcher.class);
+ }
+
+ @After
+ public void tearDownGemFire() {
+ GemFireCacheImpl.setInstanceForTests(null);
+ }
+
+ @Test
+ public void testLogBatchExceptions() throws Exception {
+ // Create AckReaderThread
+ GatewaySenderEventRemoteDispatcher.AckReaderThread thread =
+ this.dispatcher.new AckReaderThread(this.sender, "AckReaderThread");
+
+ // Create parent BatchException containing a NullPointerException with no index
+ List<BatchException70> batchExceptions = new ArrayList();
+ batchExceptions
+ .add(new BatchException70("null pointer exception", new NullPointerException(), -1, 0));
+ BatchException70 batchException = new BatchException70(batchExceptions);
+
+ // Attempt to handle the parent BatchException. If this method fails, an Exception will be
+ // thrown, and this test will fail. If it succeeds, there won't be an exception, and the test
+ // will fall through.
+ thread.logBatchExceptions(batchException);
+ }
+}