You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2019/10/21 16:33:46 UTC

[geode] 01/01: Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached"

This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch revert-4086-feature/GEODE-6807
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a7a5ecdde06ee38c9a6d98e888a65084c283e8c6
Author: Ernie Burghardt <eb...@pivotal.io>
AuthorDate: Mon Oct 21 09:33:10 2019 -0700

    Revert "GEODE-6807: cache adviseUpdate and adviseAllEventsOrCached"
---
 .../CacheDistributionAdvisorConcurrentTest.java    | 105 ------------
 .../distributed/internal/DistributionAdvisor.java  |  16 +-
 .../internal/cache/CacheDistributionAdvisor.java   |  98 +++---------
 .../internal/cache/DistributedCacheOperation.java  |   2 +-
 .../geode/internal/cache/DistributedRegion.java    |   7 +-
 .../cache/CacheDistributionAdvisorTest.java        | 176 ---------------------
 6 files changed, 30 insertions(+), 374 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
deleted file mode 100644
index 9afbc52..0000000
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/CacheDistributionAdvisorConcurrentTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-
-import static org.apache.geode.test.concurrency.Utilities.availableProcessors;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.distributed.internal.DistributionAdvisor;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.MemberAttributes;
-import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.test.concurrency.ConcurrentTestRunner;
-import org.apache.geode.test.concurrency.ParallelExecutor;
-
-
-@RunWith(ConcurrentTestRunner.class)
-public class CacheDistributionAdvisorConcurrentTest {
-  private final int count = availableProcessors() * 2;
-
-  @Test
-  public void getAdviseAllEventsOrCachedForConcurrentUpdateShouldSucceed(
-      ParallelExecutor executor) throws Exception {
-
-    DistributionAdvisor advisor = createCacheDistributionAdvisor();
-    CacheProfile profile = createCacheProfile();
-    advisor.putProfile(profile, true);
-
-    executor.inParallel(() -> {
-      ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
-    }, count);
-    executor.execute();
-
-    assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
-        .contains(profile.getDistributedMember()));
-    assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
-
-  }
-
-  @Test
-  public void getAdviseUpdateForConcurrentUpdateShouldSucceed(
-      ParallelExecutor executor) throws Exception {
-
-    EntryEventImpl event = new EntryEventImpl();
-    event.setNewValue(null);
-    event.setOperation(Operation.CREATE);
-
-    DistributionAdvisor advisor = createCacheDistributionAdvisor();
-    CacheProfile profile = createCacheProfile();
-    advisor.putProfile(profile, true);
-
-    executor.inParallel(() -> {
-      ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
-    }, count);
-    executor.execute();
-
-    assertTrue(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached()
-        .contains(profile.getDistributedMember()));
-    assertEquals(((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached().size(), 1);
-
-  }
-
-  private DistributionAdvisor createCacheDistributionAdvisor() {
-    CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
-    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
-    when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
-    DistributionManager distributionManager = mock(DistributionManager.class);
-    when(advisee.getDistributionManager()).thenReturn(distributionManager);
-    CacheDistributionAdvisor result =
-        CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
-    when(advisee.getDistributionAdvisor()).thenReturn(result);
-    return result;
-  }
-
-  private CacheProfile createCacheProfile() throws UnknownHostException {
-    InternalDistributedMember member =
-        new InternalDistributedMember(InetAddress.getLocalHost(), 0, false,
-            false, MemberAttributes.DEFAULT);
-    return new CacheProfile(member, 1);
-  }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index a2e701a..0a33f51 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -171,12 +171,6 @@ public class DistributionAdvisor {
   private int numActiveProfiles = 0;
 
   /**
-   * Profiles version number
-   */
-  protected volatile long profilesVersion = 0;
-
-
-  /**
    * A collection of MembershipListeners that want to be notified when a profile is added to or
    * removed from this DistributionAdvisor. The keys are membership listeners and the values are
    * Boolean.TRUE.
@@ -1319,7 +1313,8 @@ public class DistributionAdvisor {
   // must synchronize when modifying profile array
   private synchronized boolean basicAddProfile(Profile p) {
     // don't add more than once, but replace existing profile
-    profilesVersion++;
+    // try {
+
     int index = indexOfMemberId(p.getId());
     if (index >= 0) {
       Profile[] oldProfiles = profiles; // volatile read
@@ -1345,16 +1340,17 @@ public class DistributionAdvisor {
    * Perform work of removing the given member from this advisor.
    */
   private synchronized Profile basicRemoveMemberId(ProfileId id) {
-
+    // try {
     int i = indexOfMemberId(id);
     if (i >= 0) {
-      profilesVersion++;
       Profile profileRemoved = profiles[i];
       basicRemoveIndex(i);
       return profileRemoved;
     } else
       return null;
-
+    // } finally {
+    // Assert.assertTrue(-1 == indexOfMemberId(id));
+    // }
   }
 
   private int indexOfMemberId(ProfileId id) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 725cc61..1f3c0ba 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -112,19 +112,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
 
   // moved removedProfiles to DistributionAdvisor
 
-  private Set<InternalDistributedMember> adviseSetforAllEvents = Collections.emptySet();
-  private volatile long adviseAllEventsVersion = -1;
-
-  private Set<InternalDistributedMember> adviseSetforUpdate = Collections.emptySet();
-  private volatile long adviseUpdateVersion = -1;
-
-  private volatile long inRecoveryVersion = 0;
-  private volatile long adviseInRecoveryVersion = -1;
-
-  public synchronized void incInRecoveryVersion() {
-    inRecoveryVersion++;
-  }
-
   /** Creates a new instance of CacheDistributionAdvisor */
   protected CacheDistributionAdvisor(CacheDistributionAdvisee region) {
     super(region);
@@ -153,35 +140,19 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
   /**
    * Returns a the set of members that either want all events or are caching data.
    *
+   * @param excludeInRecovery if true then members in recovery are excluded
    */
-  Set<InternalDistributedMember> adviseAllEventsOrCached()
+  private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery)
       throws IllegalStateException {
     getAdvisee().getCancelCriterion().checkCancelInProgress(null);
-
-    // minimize volatile reads by copying ref to local var
-    long tempProfilesVersion = profilesVersion; // volatile read
-    long tempInRecoveryVersion = inRecoveryVersion; // volatile read
-
-    if (adviseAllEventsVersion != tempProfilesVersion
-        || adviseInRecoveryVersion != tempInRecoveryVersion) {
-      synchronized (adviseSetforAllEvents) {
-        if (adviseAllEventsVersion != tempProfilesVersion
-            || adviseInRecoveryVersion != tempInRecoveryVersion) {
-
-          adviseSetforAllEvents = Collections.unmodifiableSet(adviseFilter(profile -> {
-            CacheProfile cp = (CacheProfile) profile;
-            if (cp.getInRecovery()) {
-              return false;
-            }
-            return cp.cachedOrAllEventsWithListener();
-          }));
-          adviseAllEventsVersion = tempProfilesVersion;
-          adviseInRecoveryVersion = tempInRecoveryVersion;
-        }
+    return adviseFilter(profile -> {
+      assert profile instanceof CacheProfile;
+      CacheProfile cp = (CacheProfile) profile;
+      if (excludeInRecovery && cp.inRecovery) {
+        return false;
       }
-    }
-    return adviseSetforAllEvents;
-
+      return cp.cachedOrAllEventsWithListener();
+    });
   }
 
   /**
@@ -191,30 +162,18 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
   Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
     if (event.hasNewValue() || event.getOperation().isPutAll()) {
       // only need to distribute it to members that want all events or cache data
-      return adviseAllEventsOrCached();
+      return adviseAllEventsOrCached(true/* fixes 41147 */);
     } else {
       // The new value is null so this is a create with a null value,
       // in which case we only need to distribute this message to replicates
       // or all events that are not a proxy or if a proxy has a listener
-
-      // minimize volatile reads by copying ref to local var
-      long tempProfilesVersion = profilesVersion; // volatile read
-
-      if (adviseUpdateVersion != tempProfilesVersion) {
-        synchronized (adviseSetforUpdate) {
-          if (adviseUpdateVersion != tempProfilesVersion) {
-
-            adviseSetforUpdate = Collections.unmodifiableSet(adviseFilter(profile -> {
-              CacheProfile cp = (CacheProfile) profile;
-              DataPolicy dp = cp.getDataPolicy();
-              return dp.withReplication()
-                  || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
-            }));
-            adviseUpdateVersion = tempProfilesVersion;
-          }
-        }
-      }
-      return adviseSetforUpdate;
+      return adviseFilter(profile -> {
+        assert profile instanceof CacheProfile;
+        CacheProfile cp = (CacheProfile) profile;
+        DataPolicy dp = cp.dataPolicy;
+        return dp.withReplication()
+            || (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
+      });
     }
   }
 
@@ -291,7 +250,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * Same as adviseGeneric except in recovery excluded.
    */
   public Set<InternalDistributedMember> adviseCacheOp() {
-    return adviseAllEventsOrCached();
+    return adviseAllEventsOrCached(true);
   }
 
   /*
@@ -301,7 +260,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile cp = (CacheProfile) profile;
-      return !cp.getInRecovery();
+      return !cp.inRecovery;
     });
   }
 
@@ -324,7 +283,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       assert profile instanceof CacheProfile;
       CacheProfile prof = (CacheProfile) profile;
       // if region in cache is in recovery, exclude
-      if (prof.getInRecovery()) {
+      if (prof.inRecovery) {
         return false;
       }
 
@@ -405,7 +364,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       }
 
       // if region in cache is in recovery, exclude
-      if (profile.getInRecovery()) {
+      if (profile.inRecovery) {
         uninitialized.add(profile.getDistributedMember());
         continue;
       }
@@ -494,13 +453,12 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    */
   public static class CacheProfile extends DistributionAdvisor.Profile {
     public DataPolicy dataPolicy = DataPolicy.REPLICATE;
-
     public InterestPolicy interestPolicy = InterestPolicy.DEFAULT;
     public boolean hasCacheLoader = false;
     public boolean hasCacheWriter = false;
     public boolean hasCacheListener = false;
     public Scope scope = Scope.DISTRIBUTED_NO_ACK;
-    private boolean inRecovery = false;
+    public boolean inRecovery = false;
     public Set<String> gatewaySenderIds = Collections.emptySet();
     public Set<String> asyncEventQueueIds = Collections.emptySet();
     /**
@@ -652,18 +610,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       return dataPolicy.withPersistence();
     }
 
-    public boolean getInRecovery() {
-      return inRecovery;
-    };
-
-    public void setInRecovery(boolean recovery) {
-      inRecovery = recovery;
-    };
-
-    public DataPolicy getDataPolicy() {
-      return dataPolicy;
-    }
-
     /** Set the profile data information that is stored in a short */
     protected void setIntInfo(int s) {
       if ((s & REPLICATE_MASK) != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index aa96f44..0b13f2d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -345,7 +345,7 @@ public abstract class DistributedCacheOperation {
 
     try {
       // Recipients with CacheOp
-      Set<InternalDistributedMember> recipients = new HashSet<>(getRecipients());
+      Set<InternalDistributedMember> recipients = getRecipients();
       Map<InternalDistributedMember, PersistentMemberID> persistentIds = null;
       if (region.getDataPolicy().withPersistence()) {
         persistentIds = region.getDistributionAdvisor().adviseInitializedPersistentMembers();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d04882a..f13bce3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2214,12 +2214,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     cacheProfile.hasCacheListener = hasListener();
     Assert.assertTrue(scope.isDistributed());
     cacheProfile.scope = scope;
-
-    boolean newInRecovery = getImageState().getInRecovery();
-    if (cacheProfile.getInRecovery() != newInRecovery) {
-      distAdvisor.incInRecoveryVersion();
-    }
-    cacheProfile.setInRecovery(newInRecovery);
+    cacheProfile.inRecovery = getImageState().getInRecovery();
     cacheProfile.isPersistent = getDataPolicy().withPersistence();
     cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes());
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
deleted file mode 100644
index 010e658..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheDistributionAdvisorTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Set;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.distributed.internal.DistributionAdvisor;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-
-
-public class CacheDistributionAdvisorTest {
-  DistributionAdvisor advisor;
-
-  @Before
-  public void setUp() {
-    CacheDistributionAdvisee advisee = mock(CacheDistributionAdvisee.class);
-    DistributionManager distributionManager = mock(DistributionManager.class);
-    when(advisee.getDistributionManager()).thenReturn(distributionManager);
-    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
-    when(advisee.getCancelCriterion()).thenReturn(cancelCriterion);
-    advisor =
-        CacheDistributionAdvisor.createCacheDistributionAdvisor(advisee);
-    when(advisee.getDistributionAdvisor()).thenReturn(advisor);
-
-  }
-
-  @Test
-  public void testAdviseAllEventsOrCached() {
-    CacheProfile profile = mock(CacheProfile.class);
-    InternalDistributedMember member = mock(InternalDistributedMember.class);
-    when(profile.getId()).thenReturn(member);
-    when(profile.getInRecovery()).thenReturn(false);
-    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
-    when(profile.getDistributedMember()).thenReturn(member);
-
-    advisor.putProfile(profile, true);
-    Set<InternalDistributedMember> targets1 =
-        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
-
-    Set<InternalDistributedMember> targets2 =
-        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
-
-    Set<InternalDistributedMember> targets3 =
-        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
-
-    verify(profile, times(1)).getInRecovery();
-    verify(profile, times(1)).cachedOrAllEventsWithListener();
-
-  }
-
-  @Test
-  public void testAdviseAllEventsOrCached2() {
-    CacheProfile profile = mock(CacheProfile.class);
-    InternalDistributedMember member = mock(InternalDistributedMember.class);
-    when(profile.getId()).thenReturn(member);
-    when(profile.getInRecovery()).thenReturn(false);
-    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
-    when(profile.getDistributedMember()).thenReturn(member);
-    CacheProfile profile2 = mock(CacheProfile.class);
-    InternalDistributedMember member2 = mock(InternalDistributedMember.class);
-    when(profile2.getId()).thenReturn(member2);
-    when(profile2.getInRecovery()).thenReturn(false);
-    when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
-    when(profile2.getDistributedMember()).thenReturn(member2);
-
-    advisor.putProfile(profile, true);
-    Set<InternalDistributedMember> targets1 =
-        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
-
-    advisor.putProfile(profile2, true);
-    Set<InternalDistributedMember> targets2 =
-        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
-
-    Set<InternalDistributedMember> targets3 =
-        ((CacheDistributionAdvisor) advisor).adviseAllEventsOrCached();
-
-    verify(profile, times(2)).getInRecovery();
-    verify(profile, times(2)).cachedOrAllEventsWithListener();
-    verify(profile2, times(1)).getInRecovery();
-    verify(profile2, times(1)).cachedOrAllEventsWithListener();
-
-  }
-
-
-  @Test
-  public void testAdviseUpdate() {
-    CacheProfile profile = mock(CacheProfile.class);
-    InternalDistributedMember member = mock(InternalDistributedMember.class);
-    EntryEventImpl event = mock(EntryEventImpl.class);
-    when(event.hasNewValue()).thenReturn(false);
-    when(event.getOperation()).thenReturn(Operation.CREATE);
-
-    when(profile.getId()).thenReturn(member);
-    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
-    when(profile.getDistributedMember()).thenReturn(member);
-    when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
-
-    advisor.putProfile(profile, true);
-    Set<InternalDistributedMember> targets1 =
-        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
-
-    Set<InternalDistributedMember> targets2 =
-        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
-
-    Set<InternalDistributedMember> targets3 =
-        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
-
-    verify(profile, times(1)).getDataPolicy();
-  }
-
-  @Test
-  public void testAdviseUpdate2() {
-    CacheProfile profile = mock(CacheProfile.class);
-    InternalDistributedMember member = mock(InternalDistributedMember.class);
-    EntryEventImpl event = mock(EntryEventImpl.class);
-    when(event.hasNewValue()).thenReturn(false);
-    when(event.getOperation()).thenReturn(Operation.CREATE);
-
-    when(profile.getId()).thenReturn(member);
-    when(profile.cachedOrAllEventsWithListener()).thenReturn(true);
-    when(profile.getDistributedMember()).thenReturn(member);
-    when(profile.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
-
-    CacheProfile profile2 = mock(CacheProfile.class);
-    InternalDistributedMember member2 = mock(InternalDistributedMember.class);
-    when(profile2.getId()).thenReturn(member2);
-    when(profile2.cachedOrAllEventsWithListener()).thenReturn(true);
-    when(profile2.getDistributedMember()).thenReturn(member2);
-    when(profile2.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
-
-    when(event.hasNewValue()).thenReturn(false);
-    when(event.getOperation()).thenReturn(Operation.CREATE);
-
-    advisor.putProfile(profile, true);
-    Set<InternalDistributedMember> targets1 =
-        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
-
-    advisor.putProfile(profile2, true);
-    Set<InternalDistributedMember> targets2 =
-        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
-
-    Set<InternalDistributedMember> targets3 =
-        ((CacheDistributionAdvisor) advisor).adviseUpdate(event);
-
-    verify(profile, times(2)).getDataPolicy();
-    verify(profile2, times(1)).getDataPolicy();
-
-  }
-
-
-}