You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2018/11/07 21:11:14 UTC
[geode] branch develop updated: Revert "GEODE-5729: when
DistributedCacheOperation needs 2 messages, should let (#2458)" (#2801)
This is an automated email from the ASF dual-hosted git repository.
mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new e9ea18e Revert "GEODE-5729: when DistributedCacheOperation needs 2 messages, should let (#2458)" (#2801)
e9ea18e is described below
commit e9ea18e18c85b977b91192d4edbb9a4e18b2643e
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Wed Nov 7 13:11:05 2018 -0800
Revert "GEODE-5729: when DistributedCacheOperation needs 2 messages, should let (#2458)" (#2801)
This reverts commit 49eb1c5fd13aefff0995d76ec7864c82d5730dd8.
---
.../internal/cache/DistributedCacheOperation.java | 28 ++----------
.../internal/cache/CacheOperationMessageTest.java | 50 ----------------------
2 files changed, 4 insertions(+), 74 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 1238fae..27490c0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -355,11 +355,6 @@ public abstract class DistributedCacheOperation {
logger.debug("Computed this filter routing: {}", filterRouting);
}
}
- // if there's one secondary bucket holder needs two messages, then send two messages to all
- // the secondary bcukets
- if (!twoMessages.isEmpty()) {
- twoMessages = recipients;
- }
}
// some members need PR notification of the change for client/wan
@@ -510,20 +505,7 @@ public abstract class DistributedCacheOperation {
}
Set failures = null;
- boolean inhibitAllNotifications = false;
- if (event instanceof EntryEventImpl) {
- inhibitAllNotifications = ((EntryEventImpl) event).inhibitAllNotifications();
- }
CacheOperationMessage msg = createMessage();
- if (!twoMessages.isEmpty() && event instanceof EntryEventImpl) {
- // If it's message for PR and needs 2 messages, let the distribution message not to
- // trigger callback
- ((EntryEventImpl) event).setInhibitAllNotifications(true);
- logger.info(
- "after setInhibitAllNotifications:recipients for {}: {} with adjunct messages to: {}",
- event, recipients,
- adjunctRecipients);
- }
initMessage(msg, this.processor);
if (DistributedCacheOperation.internalBeforePutOutgoing != null) {
@@ -540,8 +522,7 @@ public abstract class DistributedCacheOperation {
if (r.isUsedForPartitionedRegionBucket() && event.getOperation().isEntry()) {
PartitionMessage pm = ((EntryEventImpl) event).getPartitionMessage();
if (pm != null && pm.getSender() != null
- && !pm.getSender()
- .equals(r.getDistributionManager().getDistributionManagerId())) {
+ && !pm.getSender().equals(r.getDistributionManager().getDistributionManagerId())) {
// PR message sent by another member
ReplyProcessor21.setShortSevereAlertProcessing(true);
}
@@ -648,10 +629,6 @@ public abstract class DistributedCacheOperation {
.getCacheDistributionAdvisor().adviseCacheServers();
adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
- // set to its original status
- if (event instanceof EntryEventImpl) {
- ((EntryEventImpl) event).setInhibitAllNotifications(inhibitAllNotifications);
- }
if (isPutAll) {
((BucketRegion) region).performPutAllAdjunctMessaging((DistributedPutAllOperation) this,
recipients, adjunctRecipients, filterRouting, this.processor);
@@ -1486,6 +1463,9 @@ public abstract class DistributedCacheOperation {
if (this.versionTag instanceof DiskVersionTag) {
bits |= PERSISTENT_TAG_MASK;
}
+ if (inhibitAllNotifications) {
+ bits |= INHIBIT_NOTIFICATIONS_MASK;
+ }
return bits;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
index 811cf5c..7beced8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
@@ -15,27 +15,15 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-
import org.junit.Test;
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.Operation;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.HeapDataOutputStream;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
public class CacheOperationMessageTest {
@@ -57,42 +45,4 @@ public class CacheOperationMessageTest {
assertThat(mockCacheOperationMessage._mayAddToMultipleSerialGateways(mockDistributionManager))
.isTrue();
}
-
- @Test
- public void inhibitAllNotificationsShouldOnlybBeSetInextBits() throws Exception {
- // create an UpdateMessage for a bucket region
- UpdateOperation.UpdateMessage updateMessage = mock(UpdateOperation.UpdateMessage.class);
- byte[] memId = {1, 2, 3};
- EventID eventId = new EventID(memId, 11, 12, 13);
- BucketRegion br = mock(BucketRegion.class);
- PartitionedRegion pr = mock(PartitionedRegion.class);
- when(br.getPartitionedRegion()).thenReturn(pr);
- when(pr.isUsedForPartitionedRegionBucket()).thenReturn(true);
- InternalDistributedSystem system = mock(InternalDistributedSystem.class);
- DistributionConfig dc = mock(DistributionConfig.class);
- when(br.getSystem()).thenReturn(system);
- when(system.getConfig()).thenReturn(dc);
- when(dc.getDeltaPropagation()).thenReturn(false);
- KeyInfo keyInfo = new KeyInfo("key1", null, null);
- when(br.getKeyInfo(eq("key1"), any(), any())).thenReturn(keyInfo);
- EntryEventImpl event = EntryEventImpl.create(br, Operation.UPDATE, "key1", "value1",
- null, false, null, true, eventId);
- UpdateOperation operation = new UpdateOperation(event, 0);
- CacheOperationMessage message = operation.createMessage();
- event.setInhibitAllNotifications(true);
- operation.initMessage(message, null);
- assertTrue(message instanceof UpdateOperation.UpdateMessage);
- assertTrue(message.inhibitAllNotifications);
- assertFalse(message.hasDelta()); // inhibitAllNotifications should not be interpreted as
- // hasDelta
- HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
- DataSerializer.writeObject(message, hdos);
- byte[] outputArray = hdos.toByteArray();
- ByteArrayInputStream bais = new ByteArrayInputStream(outputArray);
- UpdateOperation.UpdateMessage message2 = DataSerializer.readObject(new DataInputStream(bais));
- assertTrue(message2 instanceof UpdateOperation.UpdateMessage);
- assertTrue(message2.inhibitAllNotifications);
- assertFalse(message2.hasDelta()); // inhibitAllNotifications should not be interpreted as
- // hasDelta
- }
}