You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/07/19 17:42:57 UTC

[geode] branch support/1.14 updated: GEODE-9404: Do not log error message if sender is not configured. (#6659)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new f477b35  GEODE-9404: Do not log error message if sender is not configured. (#6659)
f477b35 is described below

commit f477b3573bd08f515c690c3abf212546709e1346
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Fri Jul 9 12:09:31 2021 -0700

    GEODE-9404: Do not log error message if sender is not configured. (#6659)
    
     * This is normal case for serial wan configuration. Error message should not be
        logged when executing transactions.
    
    * Log error message only if some events in a tx configured to group transaction but
    others do not have sender configured.
    
    * Should not wait for lastTransactionEvent in a tx if no sender configured or sender
    does not set must group transaction.
    
    (cherry picked from commit f0e328ba3eb4a1b2b47bdca5d565b79e88fecdca)
---
 .../geode/internal/cache/TXCommitMessage.java      |  6 ++--
 .../cache/TXLastEventInTransactionUtils.java       | 21 +++++--------
 .../org/apache/geode/internal/cache/TXState.java   |  7 +++--
 .../cache/TXLastEventInTransactionUtilsTest.java   | 22 +++++++-------
 .../geode/internal/cache/wan/WANTestBase.java      | 15 ++++++++++
 ...lWANPropagation_PartitionedRegionDUnitTest.java | 34 ++++++++++++++++++++++
 6 files changed, 77 insertions(+), 28 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 3a8e121..3a19d76 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -738,7 +738,8 @@ public class TXCommitMessage extends PooledDistributionMessage
     }
 
     for (EntryEventImpl ee : callbacks) {
-      boolean isLastTransactionEvent = isConfigError || ee.equals(lastTransactionEvent);
+      boolean isLastTransactionEvent = TXLastEventInTransactionUtils
+          .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee);
       try {
         if (ee.getOperation().isDestroy()) {
           ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true,
@@ -760,7 +761,8 @@ public class TXCommitMessage extends PooledDistributionMessage
   }
 
   EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks) {
-    return TXLastEventInTransactionUtils.getLastTransactionEvent(callbacks, dm.getCache());
+    return TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(callbacks,
+        dm.getCache());
   }
 
   protected void processCacheRuntimeException(CacheRuntimeException problem) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
index 0884ae2..377e6ca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
@@ -20,15 +20,10 @@ import java.util.ServiceConfigurationError;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.wan.GatewaySender;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 public class TXLastEventInTransactionUtils {
-  private static final Logger logger = LogService.getLogger();
-
   /**
    * @param callbacks list of events belonging to a transaction
    *
@@ -39,9 +34,8 @@ public class TXLastEventInTransactionUtils {
    *         events belong have different sets of senders that group transactions
    *         then it throws a ServiceConfigurationError exception.
    */
-  public static EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks,
-      Cache cache)
-      throws ServiceConfigurationError {
+  public static EntryEventImpl getLastTransactionEventInGroupedTxForWANSender(
+      List<EntryEventImpl> callbacks, Cache cache) throws ServiceConfigurationError {
     if (checkNoSendersGroupTransactionEvents(callbacks, cache)) {
       return null;
     }
@@ -77,11 +71,7 @@ public class TXLastEventInTransactionUtils {
       Cache cache) throws ServiceConfigurationError {
     for (String senderId : getSenderIdsForEvents(callbacks)) {
       GatewaySender sender = cache.getGatewaySender(senderId);
-      if (sender == null) {
-        logger.error("No sender found for {}", senderId);
-        throw new ServiceConfigurationError("No information for senderId: " + senderId);
-      }
-      if (sender.mustGroupTransactionEvents()) {
+      if (sender != null && sender.mustGroupTransactionEvents()) {
         return false;
       }
     }
@@ -117,4 +107,9 @@ public class TXLastEventInTransactionUtils {
     }
     return sender.mustGroupTransactionEvents();
   }
+
+  static boolean isLastTransactionEvent(boolean isConfigError,
+      EntryEventImpl lastTransactionEvent, EntryEventImpl entryEvent) {
+    return isConfigError || lastTransactionEvent == null || entryEvent.equals(lastTransactionEvent);
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 5e687c2..bdb1eaf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -245,15 +245,16 @@ public class TXState implements TXStateInterface {
     boolean isConfigError = false;
     EntryEventImpl lastTransactionEvent = null;
     try {
-      lastTransactionEvent =
-          TXLastEventInTransactionUtils.getLastTransactionEvent(getPendingCallbacks(), getCache());
+      lastTransactionEvent = TXLastEventInTransactionUtils
+          .getLastTransactionEventInGroupedTxForWANSender(getPendingCallbacks(), getCache());
     } catch (ServiceConfigurationError ex) {
       logger.error(ex.getMessage());
       isConfigError = true;
     }
 
     for (EntryEventImpl ee : getPendingCallbacks()) {
-      boolean isLastTransactionEvent = isConfigError || ee.equals(lastTransactionEvent);
+      boolean isLastTransactionEvent = TXLastEventInTransactionUtils
+          .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee);
       if (ee.getOperation().isDestroy()) {
         ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true,
             isLastTransactionEvent);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
index d96af89..a2632bd 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -125,7 +126,7 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event2);
 
     EntryEventImpl lastTransactionEvent =
-        TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+        TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache);
 
     assertEquals(null, lastTransactionEvent);
   }
@@ -140,7 +141,7 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event2);
 
     EntryEventImpl lastTransactionEvent =
-        TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+        TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache);
 
     assertEquals(event2, lastTransactionEvent);
   }
@@ -155,7 +156,7 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event2);
 
     EntryEventImpl lastTransactionEvent =
-        TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+        TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache);
 
     assertEquals(event2, lastTransactionEvent);
   }
@@ -169,13 +170,14 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event1);
     events.add(event2);
 
-    assertThatThrownBy(() -> TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache))
-        .isInstanceOf(ServiceConfigurationError.class)
-        .hasMessageContaining("Not all events go to the same senders that group transactions");
+    assertThatThrownBy(() -> TXLastEventInTransactionUtils
+        .getLastTransactionEventInGroupedTxForWANSender(events, cache))
+            .isInstanceOf(ServiceConfigurationError.class)
+            .hasMessageContaining("Not all events go to the same senders that group transactions");
   }
 
   @Test
-  public void getLastTransactionEventThrowsExceptionWhenSenderNotFound() {
+  public void getLastTransactionEventReturnsNullWhenSenderNotFound() {
     List<EntryEventImpl> events = new ArrayList();
     EntryEventImpl event1 = createMockEntryEventImpl(region8);
     EntryEventImpl event2 = createMockEntryEventImpl(region8);
@@ -183,9 +185,9 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event1);
     events.add(event2);
 
-    assertThatThrownBy(() -> TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache))
-        .isInstanceOf(ServiceConfigurationError.class)
-        .hasMessage("No information for senderId: sender5");
+    assertThat(
+        TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache))
+            .isNull();
   }
 
   private EntryEventImpl createMockEntryEventImpl(InternalRegion region) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 0df83ca..15e2efa 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -2302,6 +2302,21 @@ public class WANTestBase extends DistributedTestCase {
     }
   }
 
+  public static void doTxPuts(String regionName, int numPuts) {
+    try (
+        IgnoredException ignored = IgnoredException.addIgnoredException(InterruptedException.class);
+        IgnoredException ignored1 =
+            IgnoredException.addIgnoredException(GatewaySenderException.class)) {
+      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        cache.getCacheTransactionManager().begin();
+        r.put(i, "Value_" + i);
+        cache.getCacheTransactionManager().commit();
+      }
+    }
+  }
+
   public static void doPutsSameKey(String regionName, int numPuts, String key) {
     IgnoredException exp1 =
         IgnoredException.addIgnoredException(InterruptedException.class.getName());
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
index 5604a96..c56d104 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
@@ -79,6 +79,40 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
   }
 
   @Test
+  public void testPartitionedSerialPropagationWithTXWhenSendersNotConfiguredOnAllServers() {
+    int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+        isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+        isOffHeap()));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+        isOffHeap()));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+        isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.startSender("ln"));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+        isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+        isOffHeap()));
+
+    vm7.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR", 1000));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
+  }
+
+  @Test
   public void testBothReplicatedAndPartitionedSerialPropagation() {
 
     Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));