You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2019/10/23 18:23:05 UTC
[geode] 01/01: Revert "Revert "GEODE-6807: cache adviseUpdate and
adviseAllEventsOrCached" (#4189)"
This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch revert-4189-revert-4086-feature/GEODE-6807
in repository https://gitbox.apache.org/repos/asf/geode.git
commit c083ad58b880d782a5f6b9e84936cdaa7d97504e
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Wed Oct 23 20:22:27 2019 +0200
Revert "Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached" (#4189)"
This reverts commit e225ffcd5dc8f15ae59b46dc24baefcc253801f2.
---
.../CacheDistributionAdvisorConcurrentTest.java | 105 ++++++++++++
.../distributed/internal/DistributionAdvisor.java | 16 +-
.../internal/cache/CacheDistributionAdvisor.java | 98 +++++++++---
.../internal/cache/DistributedCacheOperation.java | 2 +-
.../geode/internal/cache/DistributedRegion.java | 7 +-
.../cache/CacheDistributionAdvisorTest.java | 176 +++++++++++++++++++++
6 files changed, 374 insertions(+), 30 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
new file mode 100644
index 0000000..9afbc52
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+import static org.apache.geode.test.concurrency.Utilities.availableProcessors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.MemberAttributes;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+import org.apache.geode.test.concurrency.ConcurrentTestRunner;
+import org.apache.geode.test.concurrency.ParallelExecutor;
+
+
+@RunWith(ConcurrentTestRunner.class)
+public class CacheDistributionAdvisorConcurrentTest {
+ private final int count = availableProcessors() * 2;
+
+ @Test
+ public void getAdviseAllEventsOrCachedForConcurrentUpdateShouldSucceed(
+ ParallelExecutor executor) throws Exception {
+
+ DistributionAdvisor advisor = createCacheDistributionAdvisor();
+ CacheProfile profile = createCacheProfile();
+ advisor.putProfile(profile, true);
+
+ executor.inParallel(() -> {
+ ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+ }, count);
+ executor.execute();
+
+ assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
+ .contains(profile.getDistributedMember()));
+ assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
+
+ }
+
+ @Test
+ public void getAdviseUpdateForConcurrentUpdateShouldSucceed(
+ ParallelExecutor executor) throws Exception {
+
+ EntryEventImpl event = new EntryEventImpl();
+ event.setNewValue(null);
+ event.setOperation(Operation.CREATE);
+
+ DistributionAdvisor advisor = createCacheDistributionAdvisor();
+ CacheProfile profile = createCacheProfile();
+ advisor.putProfile(profile, true);
+
+ executor.inParallel(() -> {
+ ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+ }, count);
+ executor.execute();
+
+ assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
+ .contains(profile.getDistributedMember()));
+ assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
+
+ }
+
+ private DistributionAdvisor createCacheDistributionAdvisor() {
+ CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ when(advisee.getDistributionManager()).thenReturn(distributionManager);
+ CacheDistributionAdvisor result =
+ CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
+ when(advisee.getDistributionAdvisor()).thenReturn(result);
+ return result;
+ }
+
+ private CacheProfile createCacheProfile() throws UnknownHostException {
+ InternalDistributedMember member =
+ new InternalDistributedMember(InetAddress.getLocalHost(), 0, false,
+ false, MemberAttributes.DEFAULT);
+ return new CacheProfile(member, 1);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 0a33f51..a2e701a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -171,6 +171,12 @@ public class DistributionAdvisor {
private int numActiveProfiles = 0;
/**
+ * Profiles version number
+ */
+ protected volatile long profilesVersion = 0;
+
+
+ /**
* A collection of MembershipListeners that want to be notified when a profile is added to or
* removed from this DistributionAdvisor. The keys are membership listeners and the values are
* Boolean.TRUE.
@@ -1313,8 +1319,7 @@ public class DistributionAdvisor {
// must synchronize when modifying profile array
private synchronized boolean basicAddProfile(Profile p) {
// don't add more than once, but replace existing profile
- // try {
-
+ profilesVersion++;
int index = indexOfMemberId(p.getId());
if (index >= 0) {
Profile[] oldProfiles = profiles; // volatile read
@@ -1340,17 +1345,16 @@ public class DistributionAdvisor {
* Perform work of removing the given member from this advisor.
*/
private synchronized Profile basicRemoveMemberId(ProfileId id) {
- // try {
+
int i = indexOfMemberId(id);
if (i >= 0) {
+ profilesVersion++;
Profile profileRemoved = profiles[i];
basicRemoveIndex(i);
return profileRemoved;
} else
return null;
- // } finally {
- // Assert.assertTrue(-1 == indexOfMemberId(id));
- // }
+
}
private int indexOfMemberId(ProfileId id) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 1f3c0ba..725cc61 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -112,6 +112,19 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
// moved removedProfiles to DistributionAdvisor
+ private Set<InternalDistributedMember> adviseSetforAllEvents = Collections.emptySet();
+ private volatile long adviseAllEventsVersion = -1;
+
+ private Set<InternalDistributedMember> adviseSetforUpdate = Collections.emptySet();
+ private volatile long adviseUpdateVersion = -1;
+
+ private volatile long inRecoveryVersion = 0;
+ private volatile long adviseInRecoveryVersion = -1;
+
+ public synchronized void incInRecoveryVersion() {
+ inRecoveryVersion++;
+ }
+
/** Creates a new instance of CacheDistributionAdvisor */
protected CacheDistributionAdvisor(CacheDistributionAdvisee region) {
super(region);
@@ -140,19 +153,35 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
/**
* Returns a the set of members that either want all events or are caching data.
*
- * @param excludeInRecovery if true then members in recovery are excluded
*/
- private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery)
+ Set<InternalDistributedMember> adviseAllEventsOrCached()
throws IllegalStateException {
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
- return adviseFilter(profile -> {
- assert profile instanceof CacheProfile;
- CacheProfile cp = (CacheProfile) profile;
- if (excludeInRecovery && cp.inRecovery) {
- return false;
+
+ // minimize volatile reads by copying ref to local var
+ long tempProfilesVersion = profilesVersion; // volatile read
+ long tempInRecoveryVersion = inRecoveryVersion; // volatile read
+
+ if (adviseAllEventsVersion != tempProfilesVersion
+ || adviseInRecoveryVersion != tempInRecoveryVersion) {
+ synchronized (adviseSetforAllEvents) {
+ if (adviseAllEventsVersion != tempProfilesVersion
+ || adviseInRecoveryVersion != tempInRecoveryVersion) {
+
+ adviseSetforAllEvents = Collections.unmodifiableSet(adviseFilter(profile -> {
+ CacheProfile cp = (CacheProfile) profile;
+ if (cp.getInRecovery()) {
+ return false;
+ }
+ return cp.cachedOrAllEventsWithListener();
+ }));
+ adviseAllEventsVersion = tempProfilesVersion;
+ adviseInRecoveryVersion = tempInRecoveryVersion;
+ }
}
- return cp.cachedOrAllEventsWithListener();
- });
+ }
+ return adviseSetforAllEvents;
+
}
/**
@@ -162,18 +191,30 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
if (event.hasNewValue() || event.getOperation().isPutAll()) {
// only need to distribute it to members that want all events or cache data
- return adviseAllEventsOrCached(true/* fixes 41147 */);
+ return adviseAllEventsOrCached();
} else {
// The new value is null so this is a create with a null value,
// in which case we only need to distribute this message to replicates
// or all events that are not a proxy or if a proxy has a listener
- return adviseFilter(profile -> {
- assert profile instanceof CacheProfile;
- CacheProfile cp = (CacheProfile) profile;
- DataPolicy dp = cp.dataPolicy;
- return dp.withReplication()
- || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
- });
+
+ // minimize volatile reads by copying ref to local var
+ long tempProfilesVersion = profilesVersion; // volatile read
+
+ if (adviseUpdateVersion != tempProfilesVersion) {
+ synchronized (adviseSetforUpdate) {
+ if (adviseUpdateVersion != tempProfilesVersion) {
+
+ adviseSetforUpdate = Collections.unmodifiableSet(adviseFilter(profile -> {
+ CacheProfile cp = (CacheProfile) profile;
+ DataPolicy dp = cp.getDataPolicy();
+ return dp.withReplication()
+ || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
+ }));
+ adviseUpdateVersion = tempProfilesVersion;
+ }
+ }
+ }
+ return adviseSetforUpdate;
}
}
@@ -250,7 +291,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* Same as adviseGeneric except in recovery excluded.
*/
public Set<InternalDistributedMember> adviseCacheOp() {
- return adviseAllEventsOrCached(true);
+ return adviseAllEventsOrCached();
}
/*
@@ -260,7 +301,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
- return !cp.inRecovery;
+ return !cp.getInRecovery();
});
}
@@ -283,7 +324,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
assert profile instanceof CacheProfile;
CacheProfile prof = (CacheProfile) profile;
// if region in cache is in recovery, exclude
- if (prof.inRecovery) {
+ if (prof.getInRecovery()) {
return false;
}
@@ -364,7 +405,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
}
// if region in cache is in recovery, exclude
- if (profile.inRecovery) {
+ if (profile.getInRecovery()) {
uninitialized.add(profile.getDistributedMember());
continue;
}
@@ -453,12 +494,13 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
*/
public static class CacheProfile extends DistributionAdvisor.Profile {
public DataPolicy dataPolicy = DataPolicy.REPLICATE;
+
public InterestPolicy interestPolicy = InterestPolicy.DEFAULT;
public boolean hasCacheLoader = false;
public boolean hasCacheWriter = false;
public boolean hasCacheListener = false;
public Scope scope = Scope.DISTRIBUTED_NO_ACK;
- public boolean inRecovery = false;
+ private boolean inRecovery = false;
public Set<String> gatewaySenderIds = Collections.emptySet();
public Set<String> asyncEventQueueIds = Collections.emptySet();
/**
@@ -610,6 +652,18 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
return dataPolicy.withPersistence();
}
+ public boolean getInRecovery() {
+ return inRecovery;
+ };
+
+ public void setInRecovery(boolean recovery) {
+ inRecovery = recovery;
+ };
+
+ public DataPolicy getDataPolicy() {
+ return dataPolicy;
+ }
+
/** Set the profile data information that is stored in a short */
protected void setIntInfo(int s) {
if ((s & REPLICATE_MASK) != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0b13f2d..aa96f44 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -345,7 +345,7 @@ public abstract class DistributedCacheOperation {
try {
// Recipients with CacheOp
- Set<InternalDistributedMember> recipients = getRecipients();
+ Set<InternalDistributedMember> recipients = new HashSet<>(getRecipients());
Map<InternalDistributedMember, PersistentMemberID> persistentIds = null;
if (region.getDataPolicy().withPersistence()) {
persistentIds = region.getDistributionAdvisor().adviseInitializedPersistentMembers();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index f13bce3..d04882a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2214,7 +2214,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
cacheProfile.hasCacheListener = hasListener();
Assert.assertTrue(scope.isDistributed());
cacheProfile.scope = scope;
- cacheProfile.inRecovery = getImageState().getInRecovery();
+
+ boolean newInRecovery = getImageState().getInRecovery();
+ if (cacheProfile.getInRecovery() != newInRecovery) {
+ distAdvisor.incInRecoveryVersion();
+ }
+ cacheProfile.setInRecovery(newInRecovery);
cacheProfile.isPersistent = getDataPolicy().withPersistence();
cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
new file mode 100644
index 0000000..010e658
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+
+
+public class CacheDistributionAdvisorTest {
+ DistributionAdvisor advisor;
+
+ @Before
+ public void setUp() {
+ CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ when(advisee.getDistributionManager()).thenReturn(distributionManager);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
+ advisor =
+ CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
+ when(advisee.getDistributionAdvisor()).thenReturn(advisor);
+
+ }
+
+ @Test
+ public void testAdviseAllEventsOrCached() {
+ CacheProfile profile = mock(CacheProfile.class);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ when(profile.getId()).thenReturn(member);
+ when(profile.getInRecovery()).thenReturn(false);
+ when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+ when(profile.getDistributedMember()).thenReturn(member);
+
+ advisor.putProfile(profile, true);
+ Set<InternalDistributedMember> targets1 =
+ ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+ Set<InternalDistributedMember> targets2 =
+ ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+ Set<InternalDistributedMember> targets3 =
+ ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+ verify(profile, times(1)).getInRecovery();
+ verify(profile, times(1)).cachedOrAllEventsWithListener();
+
+ }
+
+ @Test
+ public void testAdviseAllEventsOrCached2() {
+ CacheProfile profile = mock(CacheProfile.class);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ when(profile.getId()).thenReturn(member);
+ when(profile.getInRecovery()).thenReturn(false);
+ when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+ when(profile.getDistributedMember()).thenReturn(member);
+ CacheProfile profile2 = mock(CacheProfile.class);
+ InternalDistributedMember member2 = mock(InternalDistributedMember.class);
+ when(profile2.getId()).thenReturn(member2);
+ when(profile2.getInRecovery()).thenReturn(false);
+ when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
+ when(profile2.getDistributedMember()).thenReturn(member2);
+
+ advisor.putProfile(profile, true);
+ Set<InternalDistributedMember> targets1 =
+ ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+ advisor.putProfile(profile2, true);
+ Set<InternalDistributedMember> targets2 =
+ ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+ Set<InternalDistributedMember> targets3 =
+ ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
+
+ verify(profile, times(2)).getInRecovery();
+ verify(profile, times(2)).cachedOrAllEventsWithListener();
+ verify(profile2, times(1)).getInRecovery();
+ verify(profile2, times(1)).cachedOrAllEventsWithListener();
+
+ }
+
+
+ @Test
+ public void testAdviseUpdate() {
+ CacheProfile profile = mock(CacheProfile.class);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ EntryEventImpl event = mock(EntryEventImpl.class);
+ when(event.hasNewValue()).thenReturn(false);
+ when(event.getOperation()).thenReturn(Operation.CREATE);
+
+ when(profile.getId()).thenReturn(member);
+ when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+ when(profile.getDistributedMember()).thenReturn(member);
+ when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+ advisor.putProfile(profile, true);
+ Set<InternalDistributedMember> targets1 =
+ ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+ Set<InternalDistributedMember> targets2 =
+ ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+ Set<InternalDistributedMember> targets3 =
+ ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+ verify(profile, times(1)).getDataPolicy();
+ }
+
+ @Test
+ public void testAdviseUpdate2() {
+ CacheProfile profile = mock(CacheProfile.class);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ EntryEventImpl event = mock(EntryEventImpl.class);
+ when(event.hasNewValue()).thenReturn(false);
+ when(event.getOperation()).thenReturn(Operation.CREATE);
+
+ when(profile.getId()).thenReturn(member);
+ when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
+ when(profile.getDistributedMember()).thenReturn(member);
+ when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+ CacheProfile profile2 = mock(CacheProfile.class);
+ InternalDistributedMember member2 = mock(InternalDistributedMember.class);
+ when(profile2.getId()).thenReturn(member2);
+ when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
+ when(profile2.getDistributedMember()).thenReturn(member2);
+ when(profile2.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
+
+ when(event.hasNewValue()).thenReturn(false);
+ when(event.getOperation()).thenReturn(Operation.CREATE);
+
+ advisor.putProfile(profile, true);
+ Set<InternalDistributedMember> targets1 =
+ ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+ advisor.putProfile(profile2, true);
+ Set<InternalDistributedMember> targets2 =
+ ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+ Set<InternalDistributedMember> targets3 =
+ ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
+
+ verify(profile, times(2)).getDataPolicy();
+ verify(profile2, times(1)).getDataPolicy();
+
+ }
+
+
+}