You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2019/10/23 18:23:04 UTC

[geode] branch revert-4189-revert-4086-feature/GEODE-6807 created (now c083ad5)

This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a change to branch revert-4189-revert-4086-feature/GEODE-6807
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at c083ad5  Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)"

This branch includes the following new commits:

     new c083ad5  Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)"

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch revert-4189-revert-4086-feature/GEODE-6807
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c083ad58b880d782a5f6b9e84936cdaa7d97504e
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Wed Oct 23 20:22:27 2019 +0200

    Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)"
    
    This reverts commit e225ffcd5dc8f15ae59b46dc24baefcc253801f2.
---
 .../CacheDistributionAdvisorConcurrentTest.java    | 105 ++++++++++++
 .../distributed/internal/DistributionAdvisor.java  |  16 +-
 .../internal/cache/CacheDistributionAdvisor.java   |  98 +++++++++---
 .../internal/cache/DistributedCacheOperation.java  |   2 +-
 .../geode/internal/cache/DistributedRegion.java    |   7 +-
 .../cache/CacheDistributionAdvisorTest.java        | 176 +++++++++++++++++++++
 6 files changed, 374 insertions(+), 30 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
new file mode 100644
index 0000000..9afbc52
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+import static org.apache.geode.test.concurrency.Utilities.availableProcessors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.MemberAttributes;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+import org.apache.geode.test.concurrency.ConcurrentTestRunner;
+import org.apache.geode.test.concurrency.ParallelExecutor;
+
+
+@RunWith(ConcurrentTestRunner.class)
+public class CacheDistributionAdvisorConcurrentTest {
+  private final int count = availableProcessors() * 2;
+
+  @Test
+  public void getAdviseAllEventsOrCachedForConcurrentUpdateShouldSucceed(
+      ParallelExecutor executor) throws Exception {
+
+    DistributionAdvisor advisor = createCacheDistributionAdvisor();
+    CacheProfile profile = createCacheProfile();
+    advisor.putProfile(profile, true);
+
+    executor.inParallel(() -> {
+      ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+    }, count);
+    executor.execute();
+
+    assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
+        .contains(profile.getDistributedMember()));
+    assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
+
+  }
+
+  @Test
+  public void getAdviseUpdateForConcurrentUpdateShouldSucceed(
+      ParallelExecutor executor) throws Exception {
+
+    EntryEventImpl event = new EntryEventImpl();
+    event.setNewValue(null);
+    event.setOperation(Operation.CREATE);
+
+    DistributionAdvisor advisor = createCacheDistributionAdvisor();
+    CacheProfile profile = createCacheProfile();
+    advisor.putProfile(profile, true);
+
+    executor.inParallel(() -> {
+      ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+    }, count);
+    executor.execute();
+
+    assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
+        .contains(profile.getDistributedMember()));
+    assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
+
+  }
+
+  private DistributionAdvisor createCacheDistributionAdvisor() {
+    CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(advisee.getDistributionManager()).thenReturn(distributionManager);
+    CacheDistributionAdvisor result =
+        CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
+    when(advisee.getDistributionAdvisor()).thenReturn(result);
+    return result;
+  }
+
+  private CacheProfile createCacheProfile() throws UnknownHostException {
+    InternalDistributedMember member =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 0, false,
+            false, MemberAttributes.DEFAULT);
+    return new CacheProfile(member, 1);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 0a33f51..a2e701a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -171,6 +171,12 @@ public class DistributionAdvisor {
   private int numActiveProfiles = 0;
 
   /**
+   * Profiles version number
+   */
+  protected volatile long profilesVersion = 0;
+
+
+  /**
    * A collection of MembershipListeners that want to be notified when a profile is added to or
    * removed from this DistributionAdvisor. The keys are membership listeners and the values are
    * Boolean.TRUE.
@@ -1313,8 +1319,7 @@ public class DistributionAdvisor {
   // must synchronize when modifying profile array
   private synchronized boolean basicAddProfile(Profile p) {
     // don't add more than once, but replace existing profile
-    // try {
-
+    profilesVersion++;
     int index = indexOfMemberId(p.getId());
     if (index >= 0) {
       Profile[] oldProfiles = profiles; // volatile read
@@ -1340,17 +1345,16 @@ public class DistributionAdvisor {
    * Perform work of removing the given member from this advisor.
    */
   private synchronized Profile basicRemoveMemberId(ProfileId id) {
-    // try {
+
     int i = indexOfMemberId(id);
     if (i >= 0) {
+      profilesVersion++;
       Profile profileRemoved = profiles[i];
       basicRemoveIndex(i);
       return profileRemoved;
     } else
       return null;
-    // } finally {
-    // Assert.assertTrue(-1 == indexOfMemberId(id));
-    // }
+
   }
 
   private int indexOfMemberId(ProfileId id) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 1f3c0ba..725cc61 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -112,6 +112,19 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
 
   // moved removedProfiles to DistributionAdvisor
 
+  private Set<InternalDistributedMember> adviseSetforAllEvents = Collections.emptySet();
+  private volatile long adviseAllEventsVersion = -1;
+
+  private Set<InternalDistributedMember> adviseSetforUpdate = Collections.emptySet();
+  private volatile long adviseUpdateVersion = -1;
+
+  private volatile long inRecoveryVersion = 0;
+  private volatile long adviseInRecoveryVersion = -1;
+
+  public synchronized void incInRecoveryVersion() {
+    inRecoveryVersion++;
+  }
+
   /** Creates a new instance of CacheDistributionAdvisor */
   protected CacheDistributionAdvisor(CacheDistributionAdvisee region) {
     super(region);
@@ -140,19 +153,35 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
   /**
    * Returns a the set of members that either want all events or are caching data.
    *
-   * @param excludeInRecovery if true then members in recovery are excluded
    */
-  private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery)
+  Set<InternalDistributedMember> adviseAllEventsOrCached()
       throws IllegalStateException {
     getAdvisee().getCancelCriterion().checkCancelInProgress(null);
-    return adviseFilter(profile -> {
-      assert profile instanceof CacheProfile;
-      CacheProfile cp = (CacheProfile) profile;
-      if (excludeInRecovery && cp.inRecovery) {
-        return false;
+
+    // minimize volatile reads by copying ref to local var
+    long tempProfilesVersion = profilesVersion; // volatile read
+    long tempInRecoveryVersion = inRecoveryVersion; // volatile read
+
+    if (adviseAllEventsVersion != tempProfilesVersion
+        || adviseInRecoveryVersion != tempInRecoveryVersion) {
+      synchronized (adviseSetforAllEvents) {
+        if (adviseAllEventsVersion != tempProfilesVersion
+            || adviseInRecoveryVersion != tempInRecoveryVersion) {
+
+          adviseSetforAllEvents = Collections.unmodifiableSet(adviseFilter(profile -> {
+            CacheProfile cp = (CacheProfile) profile;
+            if (cp.getInRecovery()) {
+              return false;
+            }
+            return cp.cachedOrAllEventsWithListener();
+          }));
+          adviseAllEventsVersion = tempProfilesVersion;
+          adviseInRecoveryVersion = tempInRecoveryVersion;
+        }
       }
-      return cp.cachedOrAllEventsWithListener();
-    });
+    }
+    return adviseSetforAllEvents;
+
   }
 
   /**
@@ -162,18 +191,30 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
   Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
     if (event.hasNewValue() || event.getOperation().isPutAll()) {
       // only need to distribute it to members that want all events or cache data
-      return adviseAllEventsOrCached(true/* fixes 41147 */);
+      return adviseAllEventsOrCached();
     } else {
       // The new value is null so this is a create with a null value,
       // in which case we only need to distribute this message to replicates
       // or all events that are not a proxy or if a proxy has a listener
-      return adviseFilter(profile -> {
-        assert profile instanceof CacheProfile;
-        CacheProfile cp = (CacheProfile) profile;
-        DataPolicy dp = cp.dataPolicy;
-        return dp.withReplication()
-            || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
-      });
+
+      // minimize volatile reads by copying ref to local var
+      long tempProfilesVersion = profilesVersion; // volatile read
+
+      if (adviseUpdateVersion != tempProfilesVersion) {
+        synchronized (adviseSetforUpdate) {
+          if (adviseUpdateVersion != tempProfilesVersion) {
+
+            adviseSetforUpdate = Collections.unmodifiableSet(adviseFilter(profile -> {
+              CacheProfile cp = (CacheProfile) profile;
+              DataPolicy dp = cp.getDataPolicy();
+              return dp.withReplication()
+                  || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
+            }));
+            adviseUpdateVersion = tempProfilesVersion;
+          }
+        }
+      }
+      return adviseSetforUpdate;
     }
   }
 
@@ -250,7 +291,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * Same as adviseGeneric except in recovery excluded.
    */
   public Set<InternalDistributedMember> adviseCacheOp() {
-    return adviseAllEventsOrCached(true);
+    return adviseAllEventsOrCached();
   }
 
   /*
@@ -260,7 +301,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile cp = (CacheProfile) profile;
-      return !cp.inRecovery;
+      return !cp.getInRecovery();
     });
   }
 
@@ -283,7 +324,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       assert profile instanceof CacheProfile;
       CacheProfile prof = (CacheProfile) profile;
       // if region in cache is in recovery, exclude
-      if (prof.inRecovery) {
+      if (prof.getInRecovery()) {
         return false;
       }
 
@@ -364,7 +405,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       }
 
       // if region in cache is in recovery, exclude
-      if (profile.inRecovery) {
+      if (profile.getInRecovery()) {
         uninitialized.add(profile.getDistributedMember());
         continue;
       }
@@ -453,12 +494,13 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    */
   public static class CacheProfile extends DistributionAdvisor.Profile {
     public DataPolicy dataPolicy = DataPolicy.REPLICATE;
+
     public InterestPolicy interestPolicy = InterestPolicy.DEFAULT;
     public boolean hasCacheLoader = false;
     public boolean hasCacheWriter = false;
     public boolean hasCacheListener = false;
     public Scope scope = Scope.DISTRIBUTED_NO_ACK;
-    public boolean inRecovery = false;
+    private boolean inRecovery = false;
     public Set<String> gatewaySenderIds = Collections.emptySet();
     public Set<String> asyncEventQueueIds = Collections.emptySet();
     /**
@@ -610,6 +652,18 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       return dataPolicy.withPersistence();
     }
 
+    public boolean getInRecovery() {
+      return inRecovery;
+    };
+
+    public void setInRecovery(boolean recovery) {
+      inRecovery = recovery;
+    };
+
+    public DataPolicy getDataPolicy() {
+      return dataPolicy;
+    }
+
     /** Set the profile data information that is stored in a short */
     protected void setIntInfo(int s) {
       if ((s & REPLICATE_MASK) != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0b13f2d..aa96f44 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -345,7 +345,7 @@ public abstract class DistributedCacheOperation {
 
     try {
       // Recipients with CacheOp
-      Set<InternalDistributedMember> recipients = getRecipients();
+      Set<InternalDistributedMember> recipients = new HashSet<>(getRecipients());
       Map<InternalDistributedMember, PersistentMemberID> persistentIds = null;
       if (region.getDataPolicy().withPersistence()) {
         persistentIds = region.getDistributionAdvisor().adviseInitializedPersistentMembers();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index f13bce3..d04882a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2214,7 +2214,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     cacheProfile.hasCacheListener = hasListener();
     Assert.assertTrue(scope.isDistributed());
     cacheProfile.scope = scope;
-    cacheProfile.inRecovery = getImageState().getInRecovery();
+
+    boolean newInRecovery = getImageState().getInRecovery();
+    if (cacheProfile.getInRecovery() != newInRecovery) {
+      distAdvisor.incInRecoveryVersion();
+    }
+    cacheProfile.setInRecovery(newInRecovery);
     cacheProfile.isPersistent = getDataPolicy().withPersistence();
     cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes());
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
new file mode 100644
index 0000000..010e658
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+
+
+public class CacheDistributionAdvisorTest {
+  DistributionAdvisor advisor;
+
+  @Before
+  public void setUp() {
+    CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(advisee.getDistributionManager()).thenReturn(distributionManager);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
+    advisor =
+        CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
+    when(advisee.getDistributionAdvisor()).thenReturn(advisor);
+
+  }
+
+  @Test
+  public void testAdviseAllEventsOrCached() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(profile.getId()).thenReturn(member);
+    when(profile.getInRecovery()).thenReturn(false);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    verify(profile, times(1)).getInRecovery();
+    verify(profile, times(1)).cachedOrAllEventsWithListener();
+
+  }
+
+  @Test
+  public void testAdviseAllEventsOrCached2() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(profile.getId()).thenReturn(member);
+    when(profile.getInRecovery()).thenReturn(false);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+    CacheProfile profile2 = mock(CacheProfile.class);
+    InternalDistributedMember member2 = mock(InternalDistributedMember.class);
+    when(profile2.getId()).thenReturn(member2);
+    when(profile2.getInRecovery()).thenReturn(false);
+    when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile2.getDistributedMember()).thenReturn(member2);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    advisor.putProfile(profile2, true);
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    verify(profile, times(2)).getInRecovery();
+    verify(profile, times(2)).cachedOrAllEventsWithListener();
+    verify(profile2, times(1)).getInRecovery();
+    verify(profile2, times(1)).cachedOrAllEventsWithListener();
+
+  }
+
+
+  @Test
+  public void testAdviseUpdate() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    EntryEventImpl event = mock(EntryEventImpl.class);
+    when(event.hasNewValue()).thenReturn(false);
+    when(event.getOperation()).thenReturn(Operation.CREATE);
+
+    when(profile.getId()).thenReturn(member);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+    when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    verify(profile, times(1)).getDataPolicy();
+  }
+
+  @Test
+  public void testAdviseUpdate2() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    EntryEventImpl event = mock(EntryEventImpl.class);
+    when(event.hasNewValue()).thenReturn(false);
+    when(event.getOperation()).thenReturn(Operation.CREATE);
+
+    when(profile.getId()).thenReturn(member);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+    when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+    CacheProfile profile2 = mock(CacheProfile.class);
+    InternalDistributedMember member2 = mock(InternalDistributedMember.class);
+    when(profile2.getId()).thenReturn(member2);
+    when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile2.getDistributedMember()).thenReturn(member2);
+    when(profile2.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+    when(event.hasNewValue()).thenReturn(false);
+    when(event.getOperation()).thenReturn(Operation.CREATE);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    advisor.putProfile(profile2, true);
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    verify(profile, times(2)).getDataPolicy();
+    verify(profile2, times(1)).getDataPolicy();
+
+  }
+
+
+}