You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/07/19 17:42:57 UTC
[geode] branch support/1.14 updated: GEODE-9404: Do not log error
message if sender is not configured. (#6659)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push:
new f477b35 GEODE-9404: Do not log error message if sender is not configured. (#6659)
f477b35 is described below
commit f477b3573bd08f515c690c3abf212546709e1346
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Fri Jul 9 12:09:31 2021 -0700
GEODE-9404: Do not log error message if sender is not configured. (#6659)
* This is normal case for serial wan configuration. Error message should not be
logged when executing transactions.
* Log error message only if some events in a tx configured to group transaction but
others do not have sender configured.
* Should not wait for lastTransactionEvent in a tx if no sender configured or sender
does not set must group transaction.
(cherry picked from commit f0e328ba3eb4a1b2b47bdca5d565b79e88fecdca)
---
.../geode/internal/cache/TXCommitMessage.java | 6 ++--
.../cache/TXLastEventInTransactionUtils.java | 21 +++++--------
.../org/apache/geode/internal/cache/TXState.java | 7 +++--
.../cache/TXLastEventInTransactionUtilsTest.java | 22 +++++++-------
.../geode/internal/cache/wan/WANTestBase.java | 15 ++++++++++
...lWANPropagation_PartitionedRegionDUnitTest.java | 34 ++++++++++++++++++++++
6 files changed, 77 insertions(+), 28 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 3a8e121..3a19d76 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -738,7 +738,8 @@ public class TXCommitMessage extends PooledDistributionMessage
}
for (EntryEventImpl ee : callbacks) {
- boolean isLastTransactionEvent = isConfigError || ee.equals(lastTransactionEvent);
+ boolean isLastTransactionEvent = TXLastEventInTransactionUtils
+ .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee);
try {
if (ee.getOperation().isDestroy()) {
ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true,
@@ -760,7 +761,8 @@ public class TXCommitMessage extends PooledDistributionMessage
}
EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks) {
- return TXLastEventInTransactionUtils.getLastTransactionEvent(callbacks, dm.getCache());
+ return TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(callbacks,
+ dm.getCache());
}
protected void processCacheRuntimeException(CacheRuntimeException problem) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
index 0884ae2..377e6ca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
@@ -20,15 +20,10 @@ import java.util.ServiceConfigurationError;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.logging.log4j.Logger;
-
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.wan.GatewaySender;
-import org.apache.geode.logging.internal.log4j.api.LogService;
public class TXLastEventInTransactionUtils {
- private static final Logger logger = LogService.getLogger();
-
/**
* @param callbacks list of events belonging to a transaction
*
@@ -39,9 +34,8 @@ public class TXLastEventInTransactionUtils {
* events belong have different sets of senders that group transactions
* then it throws a ServiceConfigurationError exception.
*/
- public static EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks,
- Cache cache)
- throws ServiceConfigurationError {
+ public static EntryEventImpl getLastTransactionEventInGroupedTxForWANSender(
+ List<EntryEventImpl> callbacks, Cache cache) throws ServiceConfigurationError {
if (checkNoSendersGroupTransactionEvents(callbacks, cache)) {
return null;
}
@@ -77,11 +71,7 @@ public class TXLastEventInTransactionUtils {
Cache cache) throws ServiceConfigurationError {
for (String senderId : getSenderIdsForEvents(callbacks)) {
GatewaySender sender = cache.getGatewaySender(senderId);
- if (sender == null) {
- logger.error("No sender found for {}", senderId);
- throw new ServiceConfigurationError("No information for senderId: " + senderId);
- }
- if (sender.mustGroupTransactionEvents()) {
+ if (sender != null && sender.mustGroupTransactionEvents()) {
return false;
}
}
@@ -117,4 +107,9 @@ public class TXLastEventInTransactionUtils {
}
return sender.mustGroupTransactionEvents();
}
+
+ static boolean isLastTransactionEvent(boolean isConfigError,
+ EntryEventImpl lastTransactionEvent, EntryEventImpl entryEvent) {
+ return isConfigError || lastTransactionEvent == null || entryEvent.equals(lastTransactionEvent);
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 5e687c2..bdb1eaf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -245,15 +245,16 @@ public class TXState implements TXStateInterface {
boolean isConfigError = false;
EntryEventImpl lastTransactionEvent = null;
try {
- lastTransactionEvent =
- TXLastEventInTransactionUtils.getLastTransactionEvent(getPendingCallbacks(), getCache());
+ lastTransactionEvent = TXLastEventInTransactionUtils
+ .getLastTransactionEventInGroupedTxForWANSender(getPendingCallbacks(), getCache());
} catch (ServiceConfigurationError ex) {
logger.error(ex.getMessage());
isConfigError = true;
}
for (EntryEventImpl ee : getPendingCallbacks()) {
- boolean isLastTransactionEvent = isConfigError || ee.equals(lastTransactionEvent);
+ boolean isLastTransactionEvent = TXLastEventInTransactionUtils
+ .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee);
if (ee.getOperation().isDestroy()) {
ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true,
isLastTransactionEvent);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
index d96af89..a2632bd 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -125,7 +126,7 @@ public class TXLastEventInTransactionUtilsTest {
events.add(event2);
EntryEventImpl lastTransactionEvent =
- TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+ TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache);
assertEquals(null, lastTransactionEvent);
}
@@ -140,7 +141,7 @@ public class TXLastEventInTransactionUtilsTest {
events.add(event2);
EntryEventImpl lastTransactionEvent =
- TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+ TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache);
assertEquals(event2, lastTransactionEvent);
}
@@ -155,7 +156,7 @@ public class TXLastEventInTransactionUtilsTest {
events.add(event2);
EntryEventImpl lastTransactionEvent =
- TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+ TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache);
assertEquals(event2, lastTransactionEvent);
}
@@ -169,13 +170,14 @@ public class TXLastEventInTransactionUtilsTest {
events.add(event1);
events.add(event2);
- assertThatThrownBy(() -> TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache))
- .isInstanceOf(ServiceConfigurationError.class)
- .hasMessageContaining("Not all events go to the same senders that group transactions");
+ assertThatThrownBy(() -> TXLastEventInTransactionUtils
+ .getLastTransactionEventInGroupedTxForWANSender(events, cache))
+ .isInstanceOf(ServiceConfigurationError.class)
+ .hasMessageContaining("Not all events go to the same senders that group transactions");
}
@Test
- public void getLastTransactionEventThrowsExceptionWhenSenderNotFound() {
+ public void getLastTransactionEventReturnsNullWhenSenderNotFound() {
List<EntryEventImpl> events = new ArrayList();
EntryEventImpl event1 = createMockEntryEventImpl(region8);
EntryEventImpl event2 = createMockEntryEventImpl(region8);
@@ -183,9 +185,9 @@ public class TXLastEventInTransactionUtilsTest {
events.add(event1);
events.add(event2);
- assertThatThrownBy(() -> TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache))
- .isInstanceOf(ServiceConfigurationError.class)
- .hasMessage("No information for senderId: sender5");
+ assertThat(
+ TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache))
+ .isNull();
}
private EntryEventImpl createMockEntryEventImpl(InternalRegion region) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 0df83ca..15e2efa 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -2302,6 +2302,21 @@ public class WANTestBase extends DistributedTestCase {
}
}
+ public static void doTxPuts(String regionName, int numPuts) {
+ try (
+ IgnoredException ignored = IgnoredException.addIgnoredException(InterruptedException.class);
+ IgnoredException ignored1 =
+ IgnoredException.addIgnoredException(GatewaySenderException.class)) {
+ Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
+ assertNotNull(r);
+ for (long i = 0; i < numPuts; i++) {
+ cache.getCacheTransactionManager().begin();
+ r.put(i, "Value_" + i);
+ cache.getCacheTransactionManager().commit();
+ }
+ }
+ }
+
public static void doPutsSameKey(String regionName, int numPuts, String key) {
IgnoredException exp1 =
IgnoredException.addIgnoredException(InterruptedException.class.getName());
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
index 5604a96..c56d104 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
@@ -79,6 +79,40 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase
}
@Test
+ public void testPartitionedSerialPropagationWithTXWhenSendersNotConfiguredOnAllServers() {
+ int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+ isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+ isOffHeap()));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+ isOffHeap()));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
+ isOffHeap()));
+
+ vm4.invoke(() -> WANTestBase.startSender("ln"));
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+ isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+ isOffHeap()));
+
+ vm7.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR", 1000));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
+ }
+
+ @Test
public void testBothReplicatedAndPartitionedSerialPropagation() {
Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));