You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/05/24 00:04:40 UTC

[geode] branch feature/GEODE-6802 updated: GEODE-6802: Execute region synchronization on newly joined member.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-6802
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6802 by this push:
     new da960f3  GEODE-6802: Execute region synchronization on newly joined member.
da960f3 is described below

commit da960f389d5c54a23f6bfa33363b637b550bafb4
Author: eshu <es...@pivotal.io>
AuthorDate: Thu May 23 16:52:30 2019 -0700

    GEODE-6802: Execute region synchronization on newly joined member.
    
     * Region sync will be invoked to avoid data inconsistency on a newly joined/restarted member
       if it received a region synchronization request from other members due to timed task.
     * Use a flag in RegionVersionHolder to ensure only one such call is executed.
     * Make sure RVV exception is filled for persistent member requesting region sync. (This may
       leads to CommitConflictException on some transactions on persistent regions due to this
       region sync operation. But it should be rare and is acceptable compared to data inconsistency
       issue.)
---
 .../distributed/internal/DistributionAdvisor.java  |  27 +++--
 .../geode/internal/cache/DistributedRegion.java    |  19 +++
 .../internal/cache/InitialImageOperation.java      |  33 ++++-
 .../internal/cache/persistence/DiskStoreID.java    |   5 +
 .../cache/versions/RegionVersionHolder.java        |  21 +++-
 .../cache/versions/RegionVersionVector.java        |  43 ++++---
 .../internal/cache/versions/VersionSource.java     |   4 +
 .../internal/cache/DistributedRegionTest.java      |  53 ++++++++
 .../internal/cache/InitialImageOperationTest.java  |  32 ++++-
 .../cache/versions/RegionVersionHolderTest.java    |  41 +++++++
 .../cache/versions/RegionVersionVectorTest.java    | 135 +++++++++++++++++++++
 11 files changed, 377 insertions(+), 36 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 669f59a..78d33a4 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -265,6 +265,14 @@ public class DistributionAdvisor {
     if (isDebugEnabled) {
       logger.debug("da.syncForCrashedMember will sync region in cache's timer for region: {}", dr);
     }
+    CacheProfile cacheProfile = (CacheProfile) profile;
+    PersistentMemberID persistentId = getPersistentID(cacheProfile);
+    VersionSource lostVersionID;
+    if (persistentId != null) {
+      lostVersionID = persistentId.getVersionMember();
+    } else {
+      lostVersionID = id;
+    }
     // schedule the synchronization for execution in the future based on the client health monitor
     // interval. This allows client caches to retry an operation that might otherwise be recovered
     // through the sync operation. Without associated event information this could cause the
@@ -289,11 +297,9 @@ public class DistributionAdvisor {
             }
           }
         }
-        CacheProfile cp = (CacheProfile) profile;
-        PersistentMemberID persistentId = cp.persistentID;
         if (dr.getDataPolicy().withPersistence() && persistentId == null) {
           // Fix for 46704. The lost member may be a replicate
-          // or an empty accessor. We don't need to to a synchronization
+          // or an empty accessor. We don't need to do a synchronization
           // in that case, because those members send their writes to
           // a persistent member.
           if (isDebugEnabled) {
@@ -303,17 +309,16 @@ public class DistributionAdvisor {
           }
           return;
         }
-
-
-        VersionSource lostVersionID;
-        if (persistentId != null) {
-          lostVersionID = persistentId.getVersionMember();
-        } else {
-          lostVersionID = id;
-        }
         dr.synchronizeForLostMember(id, lostVersionID);
       }
     }, delay);
+    if (dr.getConcurrencyChecksEnabled()) {
+      dr.setRegionSynchronizeScheduled(lostVersionID);
+    }
+  }
+
+  private PersistentMemberID getPersistentID(CacheProfile cp) {
+    return cp.persistentID;
   }
 
   /** find the region for a delta-gii operation (synch) */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index eb6f0e5..7602758 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -117,6 +117,7 @@ import org.apache.geode.internal.cache.tx.RemoteFetchVersionMessage.FetchVersion
 import org.apache.geode.internal.cache.tx.RemoteInvalidateMessage;
 import org.apache.geode.internal.cache.tx.RemotePutMessage;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.cache.versions.RegionVersionHolder;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.versions.VersionTag;
@@ -1298,6 +1299,24 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     op.synchronizeWith(target, versionMember, lostMember);
   }
 
+
+  public void setRegionSynchronizeScheduled(VersionSource lostMemberVersionID) {
+    RegionVersionHolder regionVersionHolder =
+        getVersionVector().getHolderForMember(lostMemberVersionID);
+    if (regionVersionHolder != null) {
+      regionVersionHolder.setRegionSynchronizeScheduled();
+    }
+  }
+
+  public boolean setRegionSynchronizedWithIfNotScheduled(VersionSource lostMemberVersionID) {
+    RegionVersionHolder regionVersionHolder =
+        getVersionVector().getHolderForMember(lostMemberVersionID);
+    if (regionVersionHolder != null) {
+      return regionVersionHolder.setRegionSynchronizeScheduledOrDoneIfNot();
+    }
+    return false;
+  }
+
   /** remove any partial entries received in a failed GII */
   void cleanUpAfterFailedGII(boolean recoverFromDisk) {
     DiskRegion dskRgn = getDiskRegion();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 81624f1..449148e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -808,7 +808,6 @@ public class InitialImageOperation {
   boolean processChunk(List entries, InternalDistributedMember sender, Version remoteVersion)
       throws IOException, ClassNotFoundException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    final boolean isTraceEnabled = logger.isTraceEnabled();
 
     // one volatile read of test flag
     int slow = slowImageProcessing;
@@ -896,8 +895,8 @@ public class InitialImageOperation {
         if (diskRegion != null) {
           // verify if entry from GII is the same as the one from recovery
           RegionEntry regionEntry = this.entries.getEntry(entry.key);
-          if (isTraceEnabled) {
-            logger.trace("processChunk:entry={},tag={},re={}", entry, tag, regionEntry);
+          if (isDebugEnabled) {
+            logger.debug("processChunk:entry={},tag={},re={}", entry, tag, regionEntry);
           }
           // re will be null if the gii chunk gives us a create
           if (regionEntry != null) {
@@ -974,8 +973,8 @@ public class InitialImageOperation {
             if (tag != null) {
               tag.replaceNullIDs(sender);
             }
-            if (isTraceEnabled) {
-              logger.trace(
+            if (isDebugEnabled) {
+              logger.debug(
                   "processChunk:initialImagePut:key={},lastModified={},tmpValue={},wasRecovered={},tag={}",
                   entry.key, lastModified, tmpValue, wasRecovered, tag);
             }
@@ -1619,12 +1618,15 @@ public class InitialImageOperation {
       final boolean lclAbortTest = abortTest;
       if (lclAbortTest)
         abortTest = false;
-
+      DistributedRegion targetRegion = null;
       boolean sendFailureMessage = true;
       try {
         Assert.assertTrue(this.regionPath != null, "Region path is null.");
         final DistributedRegion rgn =
             (DistributedRegion) getGIIRegion(dm, this.regionPath, this.targetReinitialized);
+        if (lostMemberID != null) {
+          targetRegion = rgn;
+        }
         if (rgn == null) {
           return;
         }
@@ -1878,6 +1880,16 @@ public class InitialImageOperation {
           sendFailureMessage(dm, rex);
         } // !success
 
+        if (lostMemberID != null && targetRegion != null) {
+          if (lostMemberVersionID == null) {
+            lostMemberVersionID = lostMemberID;
+          }
+          // check to see if the region in this cache needs to synchronize with others
+          // it is possible that the cache is recover/restart of a member and not
+          // scheduled to synchronize with others
+          synchronizeIfNotScheduled(targetRegion, lostMemberID, lostMemberVersionID);
+        }
+
         if (internalAfterSentImageReply != null
             && regionPath.endsWith(internalAfterSentImageReply.getRegionName())) {
           internalAfterSentImageReply.run();
@@ -1891,6 +1903,15 @@ public class InitialImageOperation {
           null, null);
     }
 
+    void synchronizeIfNotScheduled(DistributedRegion region,
+        InternalDistributedMember lostMember, VersionSource lostVersionSource) {
+      if (region.setRegionSynchronizedWithIfNotScheduled(lostVersionSource)) {
+        // if region synchronization has not been scheduled or performed,
+        // we do synchronization with others right away as we received the synchronization request
+        // indicating timed task has been triggered on other nodes
+        region.synchronizeForLostMember(lostMember, lostVersionSource);
+      }
+    }
 
     /**
      * Serialize the entries into byte[] chunks, calling proc for each one. proc args: the byte[]
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java
index 601d248..0aaafea 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java
@@ -146,4 +146,9 @@ public class DiskStoreID implements VersionSource<DiskStoreID>, Serializable {
     return Long.toHexString(mostSig).substring(8);
   }
 
+  @Override
+  public boolean isDiskStoreId() {
+    return true;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java
index f378e32..de3cfee 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java
@@ -63,6 +63,8 @@ public class RegionVersionHolder<T> implements Cloneable, DataSerializable {
   private List<RVVException> exceptions;
   boolean isDepartedMember;
 
+  private transient boolean regionSynchronizeScheduledOrDone;
+
   // non final for tests
   @MutableForTesting
   public static int BIT_SET_WIDTH = 64 * 16; // should be a multiple of 4 64-bit longs
@@ -140,8 +142,6 @@ public class RegionVersionHolder<T> implements Cloneable, DataSerializable {
     return getExceptions().toString();
   }
 
-
-  /* test only method */
   public void setVersion(long ver) {
     this.version = ver;
   }
@@ -383,7 +383,7 @@ public class RegionVersionHolder<T> implements Cloneable, DataSerializable {
   /**
    * Add an exception that is older than this.bitSetVersion.
    */
-  protected synchronized void addException(long previousVersion, long nextVersion) {
+  synchronized void addException(long previousVersion, long nextVersion) {
     if (this.exceptions == null) {
       this.exceptions = new LinkedList<RVVException>();
     }
@@ -791,4 +791,19 @@ public class RegionVersionHolder<T> implements Cloneable, DataSerializable {
     return canon;
   }
 
+  private synchronized boolean isRegionSynchronizeScheduledOrDone() {
+    return regionSynchronizeScheduledOrDone;
+  }
+
+  public synchronized void setRegionSynchronizeScheduled() {
+    regionSynchronizeScheduledOrDone = true;
+  }
+
+  public synchronized boolean setRegionSynchronizeScheduledOrDoneIfNot() {
+    if (!isRegionSynchronizeScheduledOrDone()) {
+      regionSynchronizeScheduledOrDone = true;
+      return true;
+    }
+    return false;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
index 66a2a6b..b645f55 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
@@ -242,7 +242,14 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
     liveHolders = new HashMap<T, RegionVersionHolder<T>>(this.memberToVersion);
     RegionVersionHolder<T> holder = liveHolders.get(mbr);
     if (holder == null) {
-      holder = new RegionVersionHolder<T>(-1);
+      if (mbr.isDiskStoreId() && mbr.equals(myId)) {
+        // For region recovered from disk, we may have local exceptions needs to be
+        // brought back during region synchronization
+        holder = localExceptions.clone();
+        holder.setVersion(localVersion.get());
+      } else {
+        holder = new RegionVersionHolder<T>(-1);
+      }
     } else {
       holder = holder.clone();
     }
@@ -804,26 +811,34 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
    * @return true if this vector has seen the given version
    */
   public boolean contains(T id, long version) {
-    if (id.equals(this.myId)) {
-      if (isForSynchronization()) {
-        // a sync vector only has one holder & no valid version for the vector's owner
+    RegionVersionHolder<T> holder = this.memberToVersion.get(id);
+    // For region synchronization.
+    if (isForSynchronization()) {
+      if (holder == null) {
+        // we only care about missing changes from a particular member, and this
+        // vector is known to contain that member's version holder
         return true;
       }
+      if (id.equals(this.myId)) {
+        if (!myId.isDiskStoreId()) {
+          // a sync vector only has one holder if not recovered from persistence,
+          // no valid version for the vector's owner
+          return true;
+        }
+      }
+      return holder.contains(version);
+    }
+
+    // Regular GII
+    if (id.equals(this.myId)) {
       if (getCurrentVersion() < version) {
         return false;
       } else {
         return !localExceptions.hasExceptionFor(version);
       }
     }
-    RegionVersionHolder<T> holder = this.memberToVersion.get(id);
     if (holder == null) {
-      if (this.singleMember) {
-        // we only care about missing changes from a particular member, and this
-        // vector is known to contain that member's version holder
-        return true;
-      } else {
-        return false;
-      }
+      return false;
     } else {
       return holder.contains(version);
     }
@@ -1151,7 +1166,7 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
       if (cId != null) {
         return cId;
       }
-      if (id instanceof InternalDistributedMember) {
+      if (!id.isDiskStoreId()) {
         InternalDistributedSystem system = InternalDistributedSystem.getConnectedInstance();
         if (system != null) {
           can = (T) system.getDistributionManager().getCanonicalId((InternalDistributedMember) id);
@@ -1456,7 +1471,7 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
   }
 
   public static RegionVersionVector<?> create(VersionSource<?> versionMember, LocalRegion owner) {
-    if (versionMember instanceof DiskStoreID) {
+    if (versionMember.isDiskStoreId()) {
       return new DiskRegionVersionVector((DiskStoreID) versionMember, owner);
     } else {
       return new VMRegionVersionVector((InternalDistributedMember) versionMember, owner);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java
index e52dfe4..7d349d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java
@@ -31,4 +31,8 @@ import org.apache.geode.internal.DataSerializableFixedID;
 public interface VersionSource<T> extends DataSerializableFixedID, Comparable<T> {
 
   void writeEssentialData(DataOutput out) throws IOException;
+
+  default boolean isDiskStoreId() {
+    return false;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
index 59454c3..143072f 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
@@ -24,11 +24,25 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.internal.cache.versions.RegionVersionHolder;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.internal.cache.versions.VersionSource;
 
 
 public class DistributedRegionTest {
+  private RegionVersionVector vector;
+  private RegionVersionHolder holder;
+  private VersionSource lostMemberVersionID;
+
+  @Before
+  public void setup() {
+    vector = mock(RegionVersionVector.class);
+    holder = mock(RegionVersionHolder.class);
+    lostMemberVersionID = mock(VersionSource.class);
+  }
 
   @Test
   public void shouldBeMockable() throws Exception {
@@ -93,4 +107,43 @@ public class DistributedRegionTest {
     assertThat(distributedRegion.lockWhenRegionIsInitializing()).isFalse();
     verify(distributedRegion, never()).lockFailedInitialImageReadLock();
   }
+
+  @Test
+  public void versionHolderInvokesSetRegionSynchronizeScheduledIfVectorContainsLostMemberID() {
+    DistributedRegion distributedRegion = mock(DistributedRegion.class);
+    when(distributedRegion.getVersionVector()).thenReturn(vector);
+    when(vector.getHolderForMember(lostMemberVersionID)).thenReturn(holder);
+    doCallRealMethod().when(distributedRegion).setRegionSynchronizeScheduled(lostMemberVersionID);
+
+    distributedRegion.setRegionSynchronizeScheduled(lostMemberVersionID);
+
+    verify(holder).setRegionSynchronizeScheduled();
+  }
+
+  @Test
+  public void versionHolderInvokesSetRegionSynchronizeScheduledOrDoneIfNotIfVectorContainsLostMemberID() {
+    DistributedRegion distributedRegion = mock(DistributedRegion.class);
+    when(distributedRegion.getVersionVector()).thenReturn(vector);
+    when(vector.getHolderForMember(lostMemberVersionID)).thenReturn(holder);
+    doCallRealMethod().when(distributedRegion)
+        .setRegionSynchronizedWithIfNotScheduled(lostMemberVersionID);
+    when(holder.setRegionSynchronizeScheduledOrDoneIfNot()).thenReturn(true);
+
+    assertThat(distributedRegion.setRegionSynchronizedWithIfNotScheduled(lostMemberVersionID))
+        .isTrue();
+
+    verify(holder).setRegionSynchronizeScheduledOrDoneIfNot();
+  }
+
+  @Test
+  public void setRegionSynchronizedWithIfNotScheduledReturnsFalseIfVectorDoesNotContainLostMemberID() {
+    DistributedRegion distributedRegion = mock(DistributedRegion.class);
+    when(distributedRegion.getVersionVector()).thenReturn(vector);
+    when(vector.getHolderForMember(lostMemberVersionID)).thenReturn(holder);
+
+    assertThat(distributedRegion.setRegionSynchronizedWithIfNotScheduled(lostMemberVersionID))
+        .isFalse();
+
+    verify(holder, never()).setRegionSynchronizeScheduledOrDoneIfNot();
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
index 64a09bb..26663a5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -27,6 +28,8 @@ import org.junit.Test;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.VersionSource;
 
 public class InitialImageOperationTest {
 
@@ -34,6 +37,10 @@ public class InitialImageOperationTest {
   private String path;
   private LocalRegion region;
   private InternalCache cache;
+  private InitialImageOperation.RequestImageMessage message;
+  private DistributedRegion distributedRegion;
+  private InternalDistributedMember lostMember;
+  private VersionSource versionSource;
 
   @Before
   public void setUp() {
@@ -42,6 +49,10 @@ public class InitialImageOperationTest {
     cache = mock(InternalCache.class);
     dm = mock(ClusterDistributionManager.class);
     region = mock(LocalRegion.class);
+    message = spy(new InitialImageOperation.RequestImageMessage());
+    distributedRegion = mock(DistributedRegion.class);
+    lostMember = mock(InternalDistributedMember.class);
+    versionSource = mock(VersionSource.class);
 
     when(dm.getExistingCache()).thenReturn(cache);
     when(cache.getRegion(path)).thenReturn(region);
@@ -57,8 +68,6 @@ public class InitialImageOperationTest {
 
   @Test
   public void processRequestImageMessageWillSendFailureMessageIfGotCancelException() {
-    InitialImageOperation.RequestImageMessage message =
-        spy(new InitialImageOperation.RequestImageMessage());
     message.regionPath = "regionPath";
     when(dm.getExistingCache()).thenThrow(new CacheClosedException());
 
@@ -66,4 +75,23 @@ public class InitialImageOperationTest {
 
     verify(message).sendFailureMessage(eq(dm), eq(null));
   }
+
+  @Test
+  public void synchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization() {
+    when(distributedRegion.setRegionSynchronizedWithIfNotScheduled(versionSource)).thenReturn(true);
+
+    message.synchronizeIfNotScheduled(distributedRegion, lostMember, versionSource);
+
+    verify(distributedRegion).synchronizeForLostMember(lostMember, versionSource);
+  }
+
+  @Test
+  public void synchronizeForLostMemberIsNotInvokedIfRegionHasScheduledOrDoneSynchronization() {
+    when(distributedRegion.setRegionSynchronizedWithIfNotScheduled(versionSource))
+        .thenReturn(false);
+
+    message.synchronizeIfNotScheduled(distributedRegion, lostMember, versionSource);
+
+    verify(distributedRegion, never()).synchronizeForLostMember(lostMember, versionSource);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionHolderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionHolderTest.java
new file mode 100644
index 0000000..b8d3d53
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionHolderTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.versions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.persistence.DiskStoreID;
+
+public class RegionVersionHolderTest {
+  @Test
+  public void setRegionSynchronizeScheduledCanSetSyncScheduledOrDone() {
+    DiskStoreID server = new DiskStoreID(0, 0);
+    RegionVersionHolder holder = new RegionVersionHolder(server);
+
+    holder.setRegionSynchronizeScheduled();
+    assertThat(holder.setRegionSynchronizeScheduledOrDoneIfNot()).isFalse();
+  }
+
+  @Test
+  public void setRegionSynchronizeScheduledOrDoneIfNotReturnsTrueIfSyncScheduledNotSet() {
+    DiskStoreID server = new DiskStoreID(0, 0);
+    RegionVersionHolder holder = new RegionVersionHolder(server);
+
+    assertThat(holder.setRegionSynchronizeScheduledOrDoneIfNot()).isTrue();
+    assertThat(holder.setRegionSynchronizeScheduledOrDoneIfNot()).isFalse();
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
index 1d052a3..ce328ca 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
@@ -100,7 +100,142 @@ public class RegionVersionVectorTest {
     assertFalse(singletonRVV.contains(server2, 6));
     assertFalse(singletonRVV.contains(server2, 7));
     assertFalse(singletonRVV.contains(server2, 9));
+  }
+
+  @Test
+  public void testSynchronizationVectorContainsAllVersionsForSameOwnerAsTargetAndNonTarget() {
+    final String local = getIPLiteral();
+    InternalDistributedMember server1 = new InternalDistributedMember(local, 101);
+    InternalDistributedMember server2 = new InternalDistributedMember(local, 102);
+    InternalDistributedMember server3 = new InternalDistributedMember(local, 103);
+
+    RegionVersionVector rv1 = new VMRegionVersionVector(server1);
+    rv1.updateLocalVersion(10);
+    rv1.recordVersion(server2, 1);
+    rv1.recordVersion(server2, 5);
+    rv1.recordVersion(server2, 8);
+    rv1.recordVersion(server3, 1);
+    rv1.recordVersion(server3, 3);
+    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server1);
+    assertTrue(singletonRVV.isForSynchronization());
+    assertEquals(singletonRVV.getOwnerId(), server1);
+    assertTrue(singletonRVV.getMemberToVersion().containsKey(server1));
+    assertFalse(singletonRVV.getMemberToVersion().containsKey(server2));
+    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+    assertTrue(singletonRVV.contains(server1, 1));
+    assertTrue(singletonRVV.contains(server1, 11));
+
+    assertTrue(singletonRVV.contains(server3, 1));
+    assertTrue(singletonRVV.contains(server3, 11));
+
+    assertTrue(singletonRVV.contains(server2, 1));
+    assertTrue(singletonRVV.contains(server2, 5));
+    assertTrue(singletonRVV.contains(server2, 8));
+    assertTrue(singletonRVV.contains(server2, 2));
+    assertTrue(singletonRVV.contains(server2, 3));
+    assertTrue(singletonRVV.contains(server2, 4));
+    assertTrue(singletonRVV.contains(server2, 6));
+    assertTrue(singletonRVV.contains(server2, 7));
+    assertTrue(singletonRVV.contains(server2, 9));
+  }
+
+  /**
+   * server1 will simulate doing a sync with another server for operations performed
+   * by server2. server3 is another server in the cluster that we don't care about
+   * servers have version source as dist store id
+   */
+  @Test
+  public void testSynchronizationVectorWithDiskStoreIdContainsAllVersionsForNonTarget() {
+    DiskStoreID server1 = new DiskStoreID(0, 0);
+    DiskStoreID server2 = new DiskStoreID(0, 1);
+    DiskStoreID server3 = new DiskStoreID(1, 0);
+
+    RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
+    rv1.updateLocalVersion(10);
+    rv1.recordVersion(server2, 1);
+    rv1.recordVersion(server2, 5);
+    rv1.recordVersion(server2, 8);
+    rv1.recordVersion(server3, 1);
+    rv1.recordVersion(server3, 3);
+    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server2);
+    assertTrue(singletonRVV.isForSynchronization());
+    assertEquals(singletonRVV.getOwnerId(), server1);
+    assertTrue(singletonRVV.getMemberToVersion().containsKey(server2));
+    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+    assertTrue(singletonRVV.contains(server1, 1));
+    assertTrue(singletonRVV.contains(server1, 11));
+
+    assertTrue(singletonRVV.contains(server3, 1));
+    assertTrue(singletonRVV.contains(server3, 11));
+  }
+
+  @Test
+  public void testSynchronizationVectorWithDiskStoreIdContainsVersionsForTarget() {
+    DiskStoreID server1 = new DiskStoreID(0, 0);
+    DiskStoreID server2 = new DiskStoreID(0, 1);
+    DiskStoreID server3 = new DiskStoreID(1, 0);
+
+    RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
+    rv1.updateLocalVersion(10);
+    rv1.recordVersion(server2, 1);
+    rv1.recordVersion(server2, 5);
+    rv1.recordVersion(server2, 8);
+    rv1.recordVersion(server3, 1);
+    rv1.recordVersion(server3, 3);
+    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server2);
+    assertTrue(singletonRVV.isForSynchronization());
+    assertEquals(singletonRVV.getOwnerId(), server1);
+    assertTrue(singletonRVV.getMemberToVersion().containsKey(server2));
+    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+    assertTrue(singletonRVV.contains(server2, 1));
+    assertTrue(singletonRVV.contains(server2, 5));
+    assertTrue(singletonRVV.contains(server2, 8));
+
+    assertFalse(singletonRVV.contains(server2, 2));
+    assertFalse(singletonRVV.contains(server2, 3));
+    assertFalse(singletonRVV.contains(server2, 4));
+    assertFalse(singletonRVV.contains(server2, 6));
+    assertFalse(singletonRVV.contains(server2, 7));
+    assertFalse(singletonRVV.contains(server2, 9));
+  }
 
+  @Test
+  public void testSynchronizationVectorWithDiskStoreIdContainsVersionsForTargetAsOriginator() {
+    DiskStoreID server1 = new DiskStoreID(0, 0);
+    DiskStoreID server2 = new DiskStoreID(0, 1);
+    DiskStoreID server3 = new DiskStoreID(1, 0);
+
+    RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
+    RegionVersionHolder localExceptions = rv1.getLocalExceptions();
+    localExceptions.addException(2, 5);
+    localExceptions.addException(7, 9);
+    rv1.updateLocalVersion(10);
+    rv1.recordVersion(server2, 1);
+    rv1.recordVersion(server2, 5);
+    rv1.recordVersion(server2, 8);
+    rv1.recordVersion(server3, 1);
+    rv1.recordVersion(server3, 3);
+    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server1);
+    assertTrue(singletonRVV.isForSynchronization());
+    assertEquals(singletonRVV.getOwnerId(), server1);
+    assertTrue(singletonRVV.getMemberToVersion().containsKey(server1));
+    assertFalse(singletonRVV.getMemberToVersion().containsKey(server2));
+    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+    assertTrue(singletonRVV.contains(server1, 1));
+    assertTrue(singletonRVV.contains(server1, 2));
+    assertTrue(singletonRVV.contains(server1, 5));
+    assertTrue(singletonRVV.contains(server1, 6));
+    assertTrue(singletonRVV.contains(server1, 7));
+    assertTrue(singletonRVV.contains(server1, 9));
+    assertTrue(singletonRVV.contains(server1, 10));
+
+    assertFalse(singletonRVV.contains(server1, 3));
+    assertFalse(singletonRVV.contains(server1, 4));
+    assertFalse(singletonRVV.contains(server1, 8));
   }
 
   @Test