You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2019/10/23 18:23:05 UTC

[geode] 01/01: Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)"

This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch revert-4189-revert-4086-feature/GEODE-6807
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c083ad58b880d782a5f6b9e84936cdaa7d97504e
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Wed Oct 23 20:22:27 2019 +0200

    Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)"
    
    This reverts commit e225ffcd5dc8f15ae59b46dc24baefcc253801f2.
---
 .../CacheDistributionAdvisorConcurrentTest.java    | 105 ++++++++++++
 .../distributed/internal/DistributionAdvisor.java  |  16 +-
 .../internal/cache/CacheDistributionAdvisor.java   |  98 +++++++++---
 .../internal/cache/DistributedCacheOperation.java  |   2 +-
 .../geode/internal/cache/DistributedRegion.java    |   7 +-
 .../cache/CacheDistributionAdvisorTest.java        | 176 +++++++++++++++++++++
 6 files changed, 374 insertions(+), 30 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
new file mode 100644
index 0000000..9afbc52
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+import static org.apache.geode.test.concurrency.Utilities.availableProcessors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.MemberAttributes;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+import org.apache.geode.test.concurrency.ConcurrentTestRunner;
+import org.apache.geode.test.concurrency.ParallelExecutor;
+
+
+@RunWith(ConcurrentTestRunner.class)
+public class CacheDistributionAdvisorConcurrentTest {
+  private final int count = availableProcessors() * 2;
+
+  @Test
+  public void getAdviseAllEventsOrCachedForConcurrentUpdateShouldSucceed(
+      ParallelExecutor executor) throws Exception {
+
+    DistributionAdvisor advisor = createCacheDistributionAdvisor();
+    CacheProfile profile = createCacheProfile();
+    advisor.putProfile(profile, true);
+
+    executor.inParallel(() -> {
+      ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+    }, count);
+    executor.execute();
+
+    assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
+        .contains(profile.getDistributedMember()));
+    assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
+
+  }
+
+  @Test
+  public void getAdviseUpdateForConcurrentUpdateShouldSucceed(
+      ParallelExecutor executor) throws Exception {
+
+    EntryEventImpl event = new EntryEventImpl();
+    event.setNewValue(null);
+    event.setOperation(Operation.CREATE);
+
+    DistributionAdvisor advisor = createCacheDistributionAdvisor();
+    CacheProfile profile = createCacheProfile();
+    advisor.putProfile(profile, true);
+
+    executor.inParallel(() -> {
+      ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+    }, count);
+    executor.execute();
+
+    assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
+        .contains(profile.getDistributedMember()));
+    assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
+
+  }
+
+  private DistributionAdvisor createCacheDistributionAdvisor() {
+    CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(advisee.getDistributionManager()).thenReturn(distributionManager);
+    CacheDistributionAdvisor result =
+        CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
+    when(advisee.getDistributionAdvisor()).thenReturn(result);
+    return result;
+  }
+
+  private CacheProfile createCacheProfile() throws UnknownHostException {
+    InternalDistributedMember member =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 0, false,
+            false, MemberAttributes.DEFAULT);
+    return new CacheProfile(member, 1);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 0a33f51..a2e701a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -171,6 +171,12 @@ public class DistributionAdvisor {
   private int numActiveProfiles = 0;
 
   /**
+   * Profiles version number
+   */
+  protected volatile long profilesVersion = 0;
+
+
+  /**
    * A collection of MembershipListeners that want to be notified when a profile is added to or
    * removed from this DistributionAdvisor. The keys are membership listeners and the values are
    * Boolean.TRUE.
@@ -1313,8 +1319,7 @@ public class DistributionAdvisor {
   // must synchronize when modifying profile array
   private synchronized boolean basicAddProfile(Profile p) {
     // don't add more than once, but replace existing profile
-    // try {
-
+    profilesVersion++;
     int index = indexOfMemberId(p.getId());
     if (index >= 0) {
       Profile[] oldProfiles = profiles; // volatile read
@@ -1340,17 +1345,16 @@ public class DistributionAdvisor {
    * Perform work of removing the given member from this advisor.
    */
   private synchronized Profile basicRemoveMemberId(ProfileId id) {
-    // try {
+
     int i = indexOfMemberId(id);
     if (i >= 0) {
+      profilesVersion++;
       Profile profileRemoved = profiles[i];
       basicRemoveIndex(i);
       return profileRemoved;
     } else
       return null;
-    // } finally {
-    // Assert.assertTrue(-1 == indexOfMemberId(id));
-    // }
+
   }
 
   private int indexOfMemberId(ProfileId id) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 1f3c0ba..725cc61 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -112,6 +112,19 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
 
   // moved removedProfiles to DistributionAdvisor
 
+  private Set<InternalDistributedMember> adviseSetforAllEvents = Collections.emptySet();
+  private volatile long adviseAllEventsVersion = -1;
+
+  private Set<InternalDistributedMember> adviseSetforUpdate = Collections.emptySet();
+  private volatile long adviseUpdateVersion = -1;
+
+  private volatile long inRecoveryVersion = 0;
+  private volatile long adviseInRecoveryVersion = -1;
+
+  public synchronized void incInRecoveryVersion() {
+    inRecoveryVersion++;
+  }
+
   /** Creates a new instance of CacheDistributionAdvisor */
   protected CacheDistributionAdvisor(CacheDistributionAdvisee region) {
     super(region);
@@ -140,19 +153,35 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
   /**
    * Returns a the set of members that either want all events or are caching data.
    *
-   * @param excludeInRecovery if true then members in recovery are excluded
    */
-  private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery)
+  Set<InternalDistributedMember> adviseAllEventsOrCached()
       throws IllegalStateException {
     getAdvisee().getCancelCriterion().checkCancelInProgress(null);
-    return adviseFilter(profile -> {
-      assert profile instanceof CacheProfile;
-      CacheProfile cp = (CacheProfile) profile;
-      if (excludeInRecovery && cp.inRecovery) {
-        return false;
+
+    // minimize volatile reads by copying ref to local var
+    long tempProfilesVersion = profilesVersion; // volatile read
+    long tempInRecoveryVersion = inRecoveryVersion; // volatile read
+
+    if (adviseAllEventsVersion != tempProfilesVersion
+        || adviseInRecoveryVersion != tempInRecoveryVersion) {
+      synchronized (adviseSetforAllEvents) {
+        if (adviseAllEventsVersion != tempProfilesVersion
+            || adviseInRecoveryVersion != tempInRecoveryVersion) {
+
+          adviseSetforAllEvents = Collections.unmodifiableSet(adviseFilter(profile -> {
+            CacheProfile cp = (CacheProfile) profile;
+            if (cp.getInRecovery()) {
+              return false;
+            }
+            return cp.cachedOrAllEventsWithListener();
+          }));
+          adviseAllEventsVersion = tempProfilesVersion;
+          adviseInRecoveryVersion = tempInRecoveryVersion;
+        }
       }
-      return cp.cachedOrAllEventsWithListener();
-    });
+    }
+    return adviseSetforAllEvents;
+
   }
 
   /**
@@ -162,18 +191,30 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
   Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
     if (event.hasNewValue() || event.getOperation().isPutAll()) {
       // only need to distribute it to members that want all events or cache data
-      return adviseAllEventsOrCached(true/* fixes 41147 */);
+      return adviseAllEventsOrCached();
     } else {
       // The new value is null so this is a create with a null value,
       // in which case we only need to distribute this message to replicates
       // or all events that are not a proxy or if a proxy has a listener
-      return adviseFilter(profile -> {
-        assert profile instanceof CacheProfile;
-        CacheProfile cp = (CacheProfile) profile;
-        DataPolicy dp = cp.dataPolicy;
-        return dp.withReplication()
-            || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
-      });
+
+      // minimize volatile reads by copying ref to local var
+      long tempProfilesVersion = profilesVersion; // volatile read
+
+      if (adviseUpdateVersion != tempProfilesVersion) {
+        synchronized (adviseSetforUpdate) {
+          if (adviseUpdateVersion != tempProfilesVersion) {
+
+            adviseSetforUpdate = Collections.unmodifiableSet(adviseFilter(profile -> {
+              CacheProfile cp = (CacheProfile) profile;
+              DataPolicy dp = cp.getDataPolicy();
+              return dp.withReplication()
+                  || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
+            }));
+            adviseUpdateVersion = tempProfilesVersion;
+          }
+        }
+      }
+      return adviseSetforUpdate;
     }
   }
 
@@ -250,7 +291,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * Same as adviseGeneric except in recovery excluded.
    */
   public Set<InternalDistributedMember> adviseCacheOp() {
-    return adviseAllEventsOrCached(true);
+    return adviseAllEventsOrCached();
   }
 
   /*
@@ -260,7 +301,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile cp = (CacheProfile) profile;
-      return !cp.inRecovery;
+      return !cp.getInRecovery();
     });
   }
 
@@ -283,7 +324,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       assert profile instanceof CacheProfile;
       CacheProfile prof = (CacheProfile) profile;
       // if region in cache is in recovery, exclude
-      if (prof.inRecovery) {
+      if (prof.getInRecovery()) {
         return false;
       }
 
@@ -364,7 +405,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       }
 
       // if region in cache is in recovery, exclude
-      if (profile.inRecovery) {
+      if (profile.getInRecovery()) {
         uninitialized.add(profile.getDistributedMember());
         continue;
       }
@@ -453,12 +494,13 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    */
   public static class CacheProfile extends DistributionAdvisor.Profile {
     public DataPolicy dataPolicy = DataPolicy.REPLICATE;
+
     public InterestPolicy interestPolicy = InterestPolicy.DEFAULT;
     public boolean hasCacheLoader = false;
     public boolean hasCacheWriter = false;
     public boolean hasCacheListener = false;
     public Scope scope = Scope.DISTRIBUTED_NO_ACK;
-    public boolean inRecovery = false;
+    private boolean inRecovery = false;
     public Set<String> gatewaySenderIds = Collections.emptySet();
     public Set<String> asyncEventQueueIds = Collections.emptySet();
     /**
@@ -610,6 +652,18 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       return dataPolicy.withPersistence();
     }
 
+    public boolean getInRecovery() {
+      return inRecovery;
+    };
+
+    public void setInRecovery(boolean recovery) {
+      inRecovery = recovery;
+    };
+
+    public DataPolicy getDataPolicy() {
+      return dataPolicy;
+    }
+
     /** Set the profile data information that is stored in a short */
     protected void setIntInfo(int s) {
       if ((s & REPLICATE_MASK) != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0b13f2d..aa96f44 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -345,7 +345,7 @@ public abstract class DistributedCacheOperation {
 
     try {
       // Recipients with CacheOp
-      Set<InternalDistributedMember> recipients = getRecipients();
+      Set<InternalDistributedMember> recipients = new HashSet<>(getRecipients());
       Map<InternalDistributedMember, PersistentMemberID> persistentIds = null;
       if (region.getDataPolicy().withPersistence()) {
         persistentIds = region.getDistributionAdvisor().adviseInitializedPersistentMembers();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index f13bce3..d04882a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2214,7 +2214,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     cacheProfile.hasCacheListener = hasListener();
     Assert.assertTrue(scope.isDistributed());
     cacheProfile.scope = scope;
-    cacheProfile.inRecovery = getImageState().getInRecovery();
+
+    boolean newInRecovery = getImageState().getInRecovery();
+    if (cacheProfile.getInRecovery() != newInRecovery) {
+      distAdvisor.incInRecoveryVersion();
+    }
+    cacheProfile.setInRecovery(newInRecovery);
     cacheProfile.isPersistent = getDataPolicy().withPersistence();
     cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes());
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
new file mode 100644
index 0000000..010e658
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+
+
+public class CacheDistributionAdvisorTest {
+  DistributionAdvisor advisor;
+
+  @Before
+  public void setUp() {
+    CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(advisee.getDistributionManager()).thenReturn(distributionManager);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
+    advisor =
+        CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
+    when(advisee.getDistributionAdvisor()).thenReturn(advisor);
+
+  }
+
+  @Test
+  public void testAdviseAllEventsOrCached() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(profile.getId()).thenReturn(member);
+    when(profile.getInRecovery()).thenReturn(false);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    verify(profile, times(1)).getInRecovery();
+    verify(profile, times(1)).cachedOrAllEventsWithListener();
+
+  }
+
+  @Test
+  public void testAdviseAllEventsOrCached2() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(profile.getId()).thenReturn(member);
+    when(profile.getInRecovery()).thenReturn(false);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+    CacheProfile profile2 = mock(CacheProfile.class);
+    InternalDistributedMember member2 = mock(InternalDistributedMember.class);
+    when(profile2.getId()).thenReturn(member2);
+    when(profile2.getInRecovery()).thenReturn(false);
+    when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile2.getDistributedMember()).thenReturn(member2);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    advisor.putProfile(profile2, true);
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+    verify(profile, times(2)).getInRecovery();
+    verify(profile, times(2)).cachedOrAllEventsWithListener();
+    verify(profile2, times(1)).getInRecovery();
+    verify(profile2, times(1)).cachedOrAllEventsWithListener();
+
+  }
+
+
+  @Test
+  public void testAdviseUpdate() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    EntryEventImpl event = mock(EntryEventImpl.class);
+    when(event.hasNewValue()).thenReturn(false);
+    when(event.getOperation()).thenReturn(Operation.CREATE);
+
+    when(profile.getId()).thenReturn(member);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+    when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    verify(profile, times(1)).getDataPolicy();
+  }
+
+  @Test
+  public void testAdviseUpdate2() {
+    CacheProfile profile = mock(CacheProfile.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    EntryEventImpl event = mock(EntryEventImpl.class);
+    when(event.hasNewValue()).thenReturn(false);
+    when(event.getOperation()).thenReturn(Operation.CREATE);
+
+    when(profile.getId()).thenReturn(member);
+    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile.getDistributedMember()).thenReturn(member);
+    when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+    CacheProfile profile2 = mock(CacheProfile.class);
+    InternalDistributedMember member2 = mock(InternalDistributedMember.class);
+    when(profile2.getId()).thenReturn(member2);
+    when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
+    when(profile2.getDistributedMember()).thenReturn(member2);
+    when(profile2.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+    when(event.hasNewValue()).thenReturn(false);
+    when(event.getOperation()).thenReturn(Operation.CREATE);
+
+    advisor.putProfile(profile, true);
+    Set<InternalDistributedMember> targets1 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    advisor.putProfile(profile2, true);
+    Set<InternalDistributedMember> targets2 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    Set<InternalDistributedMember> targets3 =
+        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+    verify(profile, times(2)).getDataPolicy();
+    verify(profile2, times(1)).getDataPolicy();
+
+  }
+
+
+}