You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2018/11/07 21:11:14 UTC

[geode] branch develop updated: Revert "GEODE-5729: when DistributedCacheOperation needs 2 messages, should let (#2458)" (#2801)

This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e9ea18e  Revert "GEODE-5729: when DistributedCacheOperation needs 2 messages, should let (#2458)" (#2801)
e9ea18e is described below

commit e9ea18e18c85b977b91192d4edbb9a4e18b2643e
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Wed Nov 7 13:11:05 2018 -0800

    Revert "GEODE-5729: when DistributedCacheOperation needs 2 messages, should let (#2458)" (#2801)
    
    This reverts commit 49eb1c5fd13aefff0995d76ec7864c82d5730dd8.
---
 .../internal/cache/DistributedCacheOperation.java  | 28 ++----------
 .../internal/cache/CacheOperationMessageTest.java  | 50 ----------------------
 2 files changed, 4 insertions(+), 74 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 1238fae..27490c0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -355,11 +355,6 @@ public abstract class DistributedCacheOperation {
             logger.debug("Computed this filter routing: {}", filterRouting);
           }
         }
-        // if there's one secondary bucket holder needs two messages, then send two messages to all
-        // the secondary bcukets
-        if (!twoMessages.isEmpty()) {
-          twoMessages = recipients;
-        }
       }
 
       // some members need PR notification of the change for client/wan
@@ -510,20 +505,7 @@ public abstract class DistributedCacheOperation {
         }
 
         Set failures = null;
-        boolean inhibitAllNotifications = false;
-        if (event instanceof EntryEventImpl) {
-          inhibitAllNotifications = ((EntryEventImpl) event).inhibitAllNotifications();
-        }
         CacheOperationMessage msg = createMessage();
-        if (!twoMessages.isEmpty() && event instanceof EntryEventImpl) {
-          // If it's message for PR and needs 2 messages, let the distribution message not to
-          // trigger callback
-          ((EntryEventImpl) event).setInhibitAllNotifications(true);
-          logger.info(
-              "after setInhibitAllNotifications:recipients for {}: {} with adjunct messages to: {}",
-              event, recipients,
-              adjunctRecipients);
-        }
         initMessage(msg, this.processor);
 
         if (DistributedCacheOperation.internalBeforePutOutgoing != null) {
@@ -540,8 +522,7 @@ public abstract class DistributedCacheOperation {
           if (r.isUsedForPartitionedRegionBucket() && event.getOperation().isEntry()) {
             PartitionMessage pm = ((EntryEventImpl) event).getPartitionMessage();
             if (pm != null && pm.getSender() != null
-                && !pm.getSender()
-                    .equals(r.getDistributionManager().getDistributionManagerId())) {
+                && !pm.getSender().equals(r.getDistributionManager().getDistributionManagerId())) {
               // PR message sent by another member
               ReplyProcessor21.setShortSevereAlertProcessing(true);
             }
@@ -648,10 +629,6 @@ public abstract class DistributedCacheOperation {
               .getCacheDistributionAdvisor().adviseCacheServers();
           adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
 
-          // set to its original status
-          if (event instanceof EntryEventImpl) {
-            ((EntryEventImpl) event).setInhibitAllNotifications(inhibitAllNotifications);
-          }
           if (isPutAll) {
             ((BucketRegion) region).performPutAllAdjunctMessaging((DistributedPutAllOperation) this,
                 recipients, adjunctRecipients, filterRouting, this.processor);
@@ -1486,6 +1463,9 @@ public abstract class DistributedCacheOperation {
       if (this.versionTag instanceof DiskVersionTag) {
         bits |= PERSISTENT_TAG_MASK;
       }
+      if (inhibitAllNotifications) {
+        bits |= INHIBIT_NOTIFICATIONS_MASK;
+      }
       return bits;
     }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
index 811cf5c..7beced8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
@@ -15,27 +15,15 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-
 import org.junit.Test;
 
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.Operation;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.HeapDataOutputStream;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
 
 public class CacheOperationMessageTest {
@@ -57,42 +45,4 @@ public class CacheOperationMessageTest {
     assertThat(mockCacheOperationMessage._mayAddToMultipleSerialGateways(mockDistributionManager))
         .isTrue();
   }
-
-  @Test
-  public void inhibitAllNotificationsShouldOnlybBeSetInextBits() throws Exception {
-    // create an UpdateMessage for a bucket region
-    UpdateOperation.UpdateMessage updateMessage = mock(UpdateOperation.UpdateMessage.class);
-    byte[] memId = {1, 2, 3};
-    EventID eventId = new EventID(memId, 11, 12, 13);
-    BucketRegion br = mock(BucketRegion.class);
-    PartitionedRegion pr = mock(PartitionedRegion.class);
-    when(br.getPartitionedRegion()).thenReturn(pr);
-    when(pr.isUsedForPartitionedRegionBucket()).thenReturn(true);
-    InternalDistributedSystem system = mock(InternalDistributedSystem.class);
-    DistributionConfig dc = mock(DistributionConfig.class);
-    when(br.getSystem()).thenReturn(system);
-    when(system.getConfig()).thenReturn(dc);
-    when(dc.getDeltaPropagation()).thenReturn(false);
-    KeyInfo keyInfo = new KeyInfo("key1", null, null);
-    when(br.getKeyInfo(eq("key1"), any(), any())).thenReturn(keyInfo);
-    EntryEventImpl event = EntryEventImpl.create(br, Operation.UPDATE, "key1", "value1",
-        null, false, null, true, eventId);
-    UpdateOperation operation = new UpdateOperation(event, 0);
-    CacheOperationMessage message = operation.createMessage();
-    event.setInhibitAllNotifications(true);
-    operation.initMessage(message, null);
-    assertTrue(message instanceof UpdateOperation.UpdateMessage);
-    assertTrue(message.inhibitAllNotifications);
-    assertFalse(message.hasDelta()); // inhibitAllNotifications should not be interpreted as
-                                     // hasDelta
-    HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
-    DataSerializer.writeObject(message, hdos);
-    byte[] outputArray = hdos.toByteArray();
-    ByteArrayInputStream bais = new ByteArrayInputStream(outputArray);
-    UpdateOperation.UpdateMessage message2 = DataSerializer.readObject(new DataInputStream(bais));
-    assertTrue(message2 instanceof UpdateOperation.UpdateMessage);
-    assertTrue(message2.inhibitAllNotifications);
-    assertFalse(message2.hasDelta()); // inhibitAllNotifications should not be interpreted as
-                                      // hasDelta
-  }
 }