You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/12 22:36:35 UTC

[32/46] geode git commit: GEODE-2129 Make pdxType id random.

 GEODE-2129 Make pdxType id random.

Right now pdxtype id has 4 bytes. Out of those 4 bytes, one byte
reserved for distributed-system-id, this make sure type id generated
from different cluster has different id. For rest of the three bytes we
just increment counter to create new pdxtype id. In the field, we have
observed that sometimes this pdxType Id collides. One reason could be
they end up having same distributed-system-id for the different cluster.
Thus to avoid a collision, we will be using hashcode of pdxType for
three bytes of pdxType id. That will reduce the possibility of
collision.

Apart from that changed getOldValue call to region.get in
PeerTypeRegistration.

updated

updated

updated

update


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b0422711
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b0422711
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b0422711

Branch: refs/heads/feature/GEODE-1930
Commit: b04227117bbcc7d28473ad0e3743baa5e54bc022
Parents: 0c7dbed
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Thu Nov 17 16:43:18 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Wed Nov 23 17:18:37 2016 -0800

----------------------------------------------------------------------
 .../org/apache/geode/pdx/internal/EnumId.java   |   2 +-
 .../org/apache/geode/pdx/internal/PdxType.java  |   2 +-
 .../pdx/internal/PeerTypeRegistration.java      |  96 ++++++++----
 .../geode/pdx/PdxAttributesJUnitTest.java       | 150 +++++++++++++++++++
 .../geode/pdx/PdxSerializableJUnitTest.java     |  28 +++-
 .../geode/internal/cache/wan/WANTestBase.java   |  11 ++
 .../cache/wan/misc/PDXNewWanDUnitTest.java      |  63 +++++++-
 7 files changed, 314 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b0422711/geode-core/src/main/java/org/apache/geode/pdx/internal/EnumId.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/EnumId.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/EnumId.java
index 5d399eb..c8595d7 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/EnumId.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/EnumId.java
@@ -48,7 +48,7 @@ public class EnumId implements DataSerializableFixedID {
   }
 
   public int getDSId() {
-    return this.id >> 24;
+    return this.id >> 24 & 0xFF;
   }
 
   public int getEnumNum() {

http://git-wip-us.apache.org/repos/asf/geode/blob/b0422711/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxType.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxType.java
index b586f64..eaeee08 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxType.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxType.java
@@ -251,7 +251,7 @@ public class PdxType implements DataSerializable {
   }
 
   public int getDSId() {
-    return this.typeId >> 24;
+    return this.typeId >> 24 & 0xFF;
   }
 
   public int getTypeNum() {

http://git-wip-us.apache.org/repos/asf/geode/blob/b0422711/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
index 0226cca..877bd48 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
@@ -77,11 +77,11 @@ public class PeerTypeRegistration implements TypeRegistration {
    */
   public static final String REGION_NAME = "PdxTypes";
   public static final String REGION_FULL_PATH = "/" + REGION_NAME;
+  public static final int PLACE_HOLDER_FOR_TYPE_ID = 0xFFFFFF;
+  public static final int PLACE_HOLDER_FOR_DS_ID = 0xFF000000;
 
-  private int nextTypeId;
+  private int dsId;
   private final int maxTypeId;
-  private int nextEnumId;
-  private final int maxEnumId;
   private volatile DistributedLockService dls;
   private final Object dlsLock = new Object();
   private GemFireCacheImpl cache;
@@ -116,10 +116,8 @@ public class PeerTypeRegistration implements TypeRegistration {
     if (distributedSystemId == -1) {
       distributedSystemId = 0;
     }
-    this.nextTypeId = distributedSystemId << 24;
-    this.maxTypeId = distributedSystemId << 24 | 0xFFFFFF;
-    this.nextEnumId = distributedSystemId << 24;
-    this.maxEnumId = distributedSystemId << 24 | 0xFFFFFF;
+    this.dsId = distributedSystemId << 24;
+    this.maxTypeId = 0xFFFFFF;
   }
 
   private Region<Object/* Integer or EnumCode */, Object/* PdxType or enum info */> getIdToType() {
@@ -170,7 +168,7 @@ public class PeerTypeRegistration implements TypeRegistration {
 
       @Override
       public void beforeUpdate(EntryEvent<Object, Object> event) throws CacheWriterException {
-        if (!event.getOldValue().equals(event.getNewValue())) {
+        if (!event.getRegion().get(event.getKey()).equals(event.getNewValue())) {
           PdxRegistryMismatchException ex = new PdxRegistryMismatchException(
               "Trying to add a PDXType with the same id as an existing PDX type. id="
                   + event.getKey() + ", existing pdx type " + event.getOldValue() + ", new type "
@@ -229,43 +227,63 @@ public class PeerTypeRegistration implements TypeRegistration {
 
   private static final String LOCK_NAME = "PDX_LOCK";
 
-  private int allocateTypeId() {
+  private int allocateTypeId(PdxType newType) {
     TXStateProxy currentState = suspendTX();
     Region<Object, Object> r = getIdToType();
+
+    int id = newType.hashCode() & PLACE_HOLDER_FOR_TYPE_ID;
+    int newTypeId = id | this.dsId;
+
     try {
-      // Find the next available type id.
-      do {
-        this.nextTypeId++;
-        if (this.nextTypeId == maxTypeId) {
+      int maxTry = maxTypeId;
+      while (r.get(newTypeId) != null) {
+        maxTry--;
+        if (maxTry == 0) {
           throw new InternalGemFireError(
               "Used up all of the PDX type ids for this distributed system. The maximum number of PDX types is "
                   + maxTypeId);
         }
-      } while (r.get(nextTypeId) != null);
 
-      this.lastAllocatedTypeId = this.nextTypeId;
-      return this.nextTypeId;
+        // Find the next available type id.
+        id++;
+        if (id > this.maxTypeId) {
+          id = 1;
+        }
+        newTypeId = id | this.dsId;
+      }
+
+      return newTypeId;
     } finally {
       resumeTX(currentState);
     }
   }
 
-  private EnumId allocateEnumId() {
+  private EnumId allocateEnumId(EnumInfo ei) {
     TXStateProxy currentState = suspendTX();
     Region<Object, Object> r = getIdToType();
+
+    int id = ei.hashCode() & PLACE_HOLDER_FOR_TYPE_ID;
+    int newEnumId = id | this.dsId;
     try {
+      int maxTry = this.maxTypeId;
       // Find the next available type id.
-      do {
-        this.nextEnumId++;
-        if (this.nextEnumId == maxEnumId) {
+      while (r.get(new EnumId(newEnumId)) != null) {
+        maxTry--;
+        if (maxTry == 0) {
           throw new InternalGemFireError(
-              "Used up all of the PDX enum ids for this distributed system. The maximum number of PDX types is "
-                  + maxEnumId);
+              "Used up all of the PDX type ids for this distributed system. The maximum number of PDX types is "
+                  + this.maxTypeId);
         }
-      } while (r.get(new EnumId(nextEnumId)) != null);
 
-      this.lastAllocatedEnumId = this.nextEnumId;
-      return new EnumId(this.nextEnumId);
+        // Find the next available type id.
+        id++;
+        if (id > this.maxTypeId) {
+          id = 1;
+        }
+        newEnumId = id | this.dsId;
+      }
+
+      return new EnumId(newEnumId);
     } finally {
       resumeTX(currentState);
     }
@@ -354,7 +372,7 @@ public class PeerTypeRegistration implements TypeRegistration {
         return id;
       }
 
-      id = allocateTypeId();
+      id = allocateTypeId(newType);
       newType.setTypeId(id);
 
       updateIdToTypeRegion(newType);
@@ -527,6 +545,7 @@ public class PeerTypeRegistration implements TypeRegistration {
 
   /** Should be called holding the dlock */
   private int getExistingIdForType(PdxType newType) {
+    int totalPdxTypeIdInDS = 0;
     TXStateProxy currentState = suspendTX();
     try {
       int result = -1;
@@ -540,12 +559,22 @@ public class PeerTypeRegistration implements TypeRegistration {
         } else {
           PdxType foundType = (PdxType) v;
           Integer id = (Integer) k;
+          int tmpDsId = PLACE_HOLDER_FOR_DS_ID & id.intValue();
+          if (tmpDsId == this.dsId) {
+            totalPdxTypeIdInDS++;
+          }
+
           typeToId.put(foundType, id);
           if (foundType.equals(newType)) {
             result = foundType.getTypeId();
           }
         }
       }
+      if (totalPdxTypeIdInDS == this.maxTypeId) {
+        throw new InternalGemFireError(
+            "Used up all of the PDX type ids for this distributed system. The maximum number of PDX types is "
+                + this.maxTypeId);
+      }
       return result;
     } finally {
       resumeTX(currentState);
@@ -555,6 +584,7 @@ public class PeerTypeRegistration implements TypeRegistration {
   /** Should be called holding the dlock */
   private EnumId getExistingIdForEnum(EnumInfo ei) {
     TXStateProxy currentState = suspendTX();
+    int totalEnumIdInDS = 0;
     try {
       EnumId result = null;
       for (Map.Entry<Object, Object> entry : getIdToType().entrySet()) {
@@ -564,6 +594,10 @@ public class PeerTypeRegistration implements TypeRegistration {
           EnumId id = (EnumId) k;
           EnumInfo info = (EnumInfo) v;
           enumToId.put(info, id);
+          int tmpDsId = PLACE_HOLDER_FOR_DS_ID & id.intValue();
+          if (tmpDsId == this.dsId) {
+            totalEnumIdInDS++;
+          }
           if (ei.equals(info)) {
             result = id;
           }
@@ -571,6 +605,12 @@ public class PeerTypeRegistration implements TypeRegistration {
           typeToId.put((PdxType) v, (Integer) k);
         }
       }
+
+      if (totalEnumIdInDS == this.maxTypeId) {
+        throw new InternalGemFireError(
+            "Used up all of the PDX enum ids for this distributed system. The maximum number of PDX types is "
+                + this.maxTypeId);
+      }
       return result;
     } finally {
       resumeTX(currentState);
@@ -605,7 +645,7 @@ public class PeerTypeRegistration implements TypeRegistration {
         return id.intValue();
       }
 
-      id = allocateEnumId();
+      id = allocateEnumId(ei);
 
       updateIdToEnumRegion(id, ei);
 
@@ -652,7 +692,7 @@ public class PeerTypeRegistration implements TypeRegistration {
         return id.intValue();
       }
 
-      id = allocateEnumId();
+      id = allocateEnumId(newInfo);
 
       updateIdToEnumRegion(id, newInfo);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b0422711/geode-core/src/test/java/org/apache/geode/pdx/PdxAttributesJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxAttributesJUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxAttributesJUnitTest.java
index b755abe..753fcde 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/PdxAttributesJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxAttributesJUnitTest.java
@@ -18,11 +18,16 @@ import org.apache.geode.DataSerializer;
 import org.apache.geode.ToDataException;
 import org.apache.geode.cache.*;
 import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.FileUtil;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.pdx.SimpleClass.SimpleEnum;
+import org.apache.geode.pdx.internal.EnumId;
+import org.apache.geode.pdx.internal.EnumInfo;
+import org.apache.geode.pdx.internal.PdxType;
 import org.apache.geode.pdx.internal.PeerTypeRegistration;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.apache.geode.test.junit.categories.SerializationTest;
@@ -34,6 +39,8 @@ import org.junit.experimental.categories.Category;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.junit.Assert.assertEquals;
@@ -103,6 +110,149 @@ public class PdxAttributesJUnitTest {
   }
 
   @Test
+  public void testPdxTypeId() throws Exception {
+
+    int dsId = 5;
+    CacheFactory cf = new CacheFactory();
+    cf.set(MCAST_PORT, "0");
+    cf.set(ConfigurationProperties.DISTRIBUTED_SYSTEM_ID, String.valueOf(dsId));
+    Cache cache = cf.create();
+
+    // define a type.
+    defineAType();
+
+    Region pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_NAME);
+    Iterator itr = pdxRegion.entrySet().iterator();
+
+    boolean found = false;
+    boolean foundEnum = false;
+    while (itr.hasNext()) {
+      Map.Entry ent = (Map.Entry) itr.next();
+      if (ent.getKey() instanceof Integer) {
+        int pdxTypeId = (int) ent.getKey();
+        PdxType pdxType = (PdxType) ent.getValue();
+
+        int pdxTypeHashcode = pdxType.hashCode();
+        System.out.println("pdx hashcode " + pdxTypeHashcode);
+        int expectedPdxTypeId =
+            (dsId << 24) | (PeerTypeRegistration.PLACE_HOLDER_FOR_TYPE_ID & pdxTypeHashcode);
+
+        assertEquals(expectedPdxTypeId, pdxTypeId);
+
+        found = true;
+      } else {
+        EnumId enumId = (EnumId) ent.getKey();
+        EnumInfo enumInfo = (EnumInfo) ent.getValue();
+
+        EnumInfo expectedEnumInfo = new EnumInfo(SimpleEnum.TWO);
+        int expectKey = (dsId << 24)
+            | (PeerTypeRegistration.PLACE_HOLDER_FOR_TYPE_ID & expectedEnumInfo.hashCode());;
+
+        assertEquals(expectKey, enumId.intValue());
+        foundEnum = true;
+      }
+    }
+
+    assertEquals(true, found);
+    assertEquals(true, foundEnum);
+    cache.close();
+
+  }
+
+  @Test
+  public void testDuplicatePdxTypeId() throws Exception {
+
+    int dsId = 5;
+    CacheFactory cf = new CacheFactory();
+    cf.set(MCAST_PORT, "0");
+    cf.set(ConfigurationProperties.DISTRIBUTED_SYSTEM_ID, String.valueOf(dsId));
+    Cache cache = cf.create();
+
+    // define a type.
+    defineAType();
+
+    Region pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_NAME);
+    Iterator itr = pdxRegion.entrySet().iterator();
+
+    boolean foundException = false;
+    boolean foundEnumException = false;
+    while (itr.hasNext()) {
+      Map.Entry ent = (Map.Entry) itr.next();
+      if (ent.getKey() instanceof Integer) {
+        int pdxTypeId = (int) ent.getKey();
+
+        try {
+          pdxRegion.put(pdxTypeId, new PdxType());
+        } catch (CacheWriterException cwe) {
+          foundException = true;
+        }
+      } else {
+        EnumId enumId = (EnumId) ent.getKey();
+
+        EnumInfo enumInfo = new EnumInfo(SimpleEnum.ONE);
+        try {
+          pdxRegion.put(enumId, enumInfo);
+        } catch (CacheWriterException cwe) {
+          foundEnumException = true;
+        }
+      }
+    }
+
+    assertEquals(true, foundException);
+    assertEquals(true, foundEnumException);
+    cache.close();
+
+  }
+
+  @Test
+  public void testPdxTypeIdWithNegativeDsId() throws Exception {
+    // in this case geode will use 0 as dsId
+    int dsId = -1;
+    CacheFactory cf = new CacheFactory();
+    cf.set(MCAST_PORT, "0");
+    cf.set(ConfigurationProperties.DISTRIBUTED_SYSTEM_ID, String.valueOf(dsId));
+    Cache cache = cf.create();
+
+    // define a type.
+    defineAType();
+
+    Region pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_NAME);
+    Iterator itr = pdxRegion.entrySet().iterator();
+
+    boolean found = false;
+    boolean foundEnum = false;
+    while (itr.hasNext()) {
+      Map.Entry ent = (Map.Entry) itr.next();
+      if (ent.getKey() instanceof Integer) {
+        int pdxTypeId = (int) ent.getKey();
+        PdxType pdxType = (PdxType) ent.getValue();
+
+        int pdxTypeHashcode = pdxType.hashCode();
+        System.out.println("pdx hashcode " + pdxTypeHashcode);
+        int expectedPdxTypeId = PeerTypeRegistration.PLACE_HOLDER_FOR_TYPE_ID & pdxTypeHashcode;
+
+        assertEquals(expectedPdxTypeId, pdxTypeId);
+
+        found = true;
+      } else {
+        EnumId enumId = (EnumId) ent.getKey();
+        EnumInfo enumInfo = (EnumInfo) ent.getValue();
+
+        EnumInfo expectedEnumInfo = new EnumInfo(SimpleEnum.TWO);
+        int expectKey =
+            PeerTypeRegistration.PLACE_HOLDER_FOR_TYPE_ID & expectedEnumInfo.hashCode();;
+
+        assertEquals(expectKey, enumId.intValue());
+        foundEnum = true;
+      }
+    }
+
+    assertEquals(true, found);
+    cache.close();
+
+  }
+
+  @Test
   public void testPdxDiskStore() throws Exception {
     {
       CacheFactory cf = new CacheFactory();

http://git-wip-us.apache.org/repos/asf/geode/blob/b0422711/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableJUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableJUnitTest.java
index af0eb40..72645d9 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableJUnitTest.java
@@ -63,6 +63,7 @@ import org.apache.geode.pdx.internal.DataSize;
 import org.apache.geode.pdx.internal.PdxReaderImpl;
 import org.apache.geode.pdx.internal.PdxType;
 import org.apache.geode.pdx.internal.PdxWriterImpl;
+import org.apache.geode.pdx.internal.PeerTypeRegistration;
 import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
@@ -86,6 +87,16 @@ public class PdxSerializableJUnitTest {
     return this.c.getPdxRegistry().getLastAllocatedTypeId();
   }
 
+  private int getPdxTypeIdForClass(Class c) {
+    // here we are assuming Dsid == 0
+    return this.c.getPdxRegistry().getExistingTypeForClass(c).hashCode()
+        & PeerTypeRegistration.PLACE_HOLDER_FOR_TYPE_ID;
+  }
+
+  private int getNumPdxTypes() {
+    return this.c.getPdxRegistry().typeMap().size();
+  }
+
   @Test
   public void testNoDiskStore() throws Exception {
     this.c.close();
@@ -264,7 +275,7 @@ public class PdxSerializableJUnitTest {
     HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
     PdxSerializable object = new SimpleClass(1, (byte) 5, null);
     DataSerializer.writeObject(object, out);
-    int typeId = getLastPdxTypeId();
+    int typeId = getPdxTypeIdForClass(SimpleClass.class);
     byte[] actual = out.toByteArray();
     byte[] expected = new byte[] {DSCODE.PDX, // byte
         0, 0, 0, 4 + 1 + 1, // int - length of byte stream = 4(myInt) + 1(myByte) + 1 (myEnum)
@@ -316,7 +327,7 @@ public class PdxSerializableJUnitTest {
     SimpleClass1 pdx =
         new SimpleClass1(myFlag, myShort, myString1, myLong, myString2, myString3, myInt, myFloat);
     DataSerializer.writeObject(pdx, out);
-    int typeId = getLastPdxTypeId();
+    int typeId = getPdxTypeIdForClass(SimpleClass1.class);
 
     HeapDataOutputStream hdos1 = new HeapDataOutputStream(Version.CURRENT);
     DataSerializer.writeString(myString1, hdos1);
@@ -543,7 +554,7 @@ public class PdxSerializableJUnitTest {
     SimpleClass1 pdx =
         new SimpleClass1(myFlag, myShort, myString1, myLong, myString2, myString3, myInt, myFloat);
     DataSerializer.writeObject(pdx, out);
-    int typeId = getLastPdxTypeId();
+    int typeId = getPdxTypeIdForClass(SimpleClass1.class);
 
     HeapDataOutputStream hdos1 = new HeapDataOutputStream(Version.CURRENT);
     DataSerializer.writeString(myString1, hdos1);
@@ -753,7 +764,7 @@ public class PdxSerializableJUnitTest {
     HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
     PdxSerializable pdx = new NestedPdx(myString1, myLong, myHashMap, myString2, myFloat);
     DataSerializer.writeObject(pdx, out);
-    int typeId = getLastPdxTypeId();
+    int typeId = getPdxTypeIdForClass(NestedPdx.class);
 
     HeapDataOutputStream hdos1 = new HeapDataOutputStream(Version.CURRENT);
     DataSerializer.writeString(myString1, hdos1);
@@ -846,7 +857,7 @@ public class PdxSerializableJUnitTest {
     HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
     PdxSerializable pdx = new DSInsidePdx(myString1, myLong, myDS, myString2, myFloat);
     DataSerializer.writeObject(pdx, out);
-    int typeId = getLastPdxTypeId();
+    int typeId = getPdxTypeIdForClass(DSInsidePdx.class);
 
     HeapDataOutputStream hdos1 = new HeapDataOutputStream(Version.CURRENT);
     DataSerializer.writeString(myString1, hdos1);
@@ -2078,10 +2089,12 @@ public class PdxSerializableJUnitTest {
     assertEquals(7, pdxv1.f1);
     assertEquals(0, pdxv1.f2);
 
+    int numPdxTypes = getNumPdxTypes();
     // now reserialize and make sure f2 is preserved
     byte[] v1actual = createBlob(pdxv1);
+
     int mergedTypeId = getBlobPdxTypeId(v1actual);
-    assertEquals(v3typeId + 1, mergedTypeId);
+    assertEquals(numPdxTypes + 1, getNumPdxTypes());
     TypeRegistry tr = c.getPdxRegistry();
     PdxType v3Type = tr.getType(v3typeId);
     PdxType mergedType = tr.getType(mergedTypeId);
@@ -2114,10 +2127,11 @@ public class PdxSerializableJUnitTest {
     assertEquals(0, pdxv2.f2);
     pdxv2.f2 = 23;
 
+    int numPdxTypes = getNumPdxTypes();
     // now reserialize and make sure it is version2 and not version1
     byte[] v2actual = createBlob(pdxv2);
     int v2typeId = getBlobPdxTypeId(v2actual);
-    assertEquals(v1typeId + 1, v2typeId);
+    assertEquals(numPdxTypes + 1, getNumPdxTypes());
 
     TypeRegistry tr = c.getPdxRegistry();
     PdxType v2Type = tr.getType(v2typeId);

http://git-wip-us.apache.org/repos/asf/geode/blob/b0422711/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index f9c18ec..6351933 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -2802,6 +2802,17 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     }
   }
 
+  public static void validateRegionSizeOnly_PDX(String regionName, final int regionSize) {
+    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    Awaitility.await().atMost(200, TimeUnit.SECONDS)
+        .until(
+            () -> assertEquals(
+                "Expected region entries: " + regionSize + " but actual entries: "
+                    + r.keySet().size() + " present region keyset " + r.keySet(),
+                true, (regionSize <= r.keySet().size())));
+  }
+
   public static void validateQueueSizeStat(String id, final int queueSize) {
     final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
     Awaitility.await().atMost(30, TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/geode/blob/b0422711/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
index f1e8f42..38a91e6 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
@@ -113,6 +113,67 @@ public class PDXNewWanDUnitTest extends WANTestBase {
     vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 2));
   }
 
+
+  @Test
+  public void testWANPDX_CacheWriterCheck() {
+    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    vm2.invoke(() -> setSystemProperty("gemfire.disk.recoverValues", "false"));
+
+    vm2.invoke(() -> WANTestBase.createReceiver_PDX(nyPort));
+
+    vm3.invoke(() -> WANTestBase.createCache_PDX(lnPort));
+
+    vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
+
+    vm2.invoke(
+        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+
+    vm3.invoke(() -> WANTestBase.startSender("ln"));
+
+    vm3.invoke(
+        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
+
+    vm3.invoke(() -> WANTestBase.doPutsPDXSerializable(getTestMethodName() + "_RR", 1));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 1));
+
+    // Close VM2 cache
+    vm2.invoke(() -> WANTestBase.closeCache());
+
+    // do some puts on VM3 and create extra pdx id
+    vm3.invoke(() -> WANTestBase.doPutsPDXSerializable2(getTestMethodName() + "_RR", 2));
+
+    // start cache in vm2 again, now it should receive pdx id from vm3
+    vm2.invoke(() -> WANTestBase.createReceiver_PDX(nyPort));
+
+    vm2.invoke(
+        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+
+
+    try {
+      Wait.pause(10000);
+      // Define a different type from vm3
+      vm3.invoke(() -> WANTestBase.doPutsPDXSerializable2(getTestMethodName() + "_RR", 2));
+
+      // Give the updates some time to make it over the WAN
+      Wait.pause(10000);
+
+      vm2.invoke(() -> WANTestBase.validateRegionSizeOnly_PDX(getTestMethodName() + "_RR", 2));
+
+      vm3.invoke(() -> WANTestBase.closeCache());
+
+      vm2.invoke(() -> WANTestBase.closeCache());
+    } finally {
+      vm2.invoke(() -> setSystemProperty("gemfire.disk.recoverValues", "true"));
+    }
+  }
+
+  private void setSystemProperty(String key, String value) {
+    System.setProperty(key, value);
+  }
+
   /**
    * Test 1> Site 1 : 1 locator, 1 member 2> Site 2 : 1 locator, 1 member 3> DR is defined on member
    * 1 on site1 4> Serial GatewaySender is defined on member 1 on site1 5> Same DR is defined on
@@ -173,7 +234,7 @@ public class PDXNewWanDUnitTest extends WANTestBase {
       // Give the updates some time to make it over the WAN
       Wait.pause(10000);
 
-      vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 1));
+      vm2.invoke(() -> WANTestBase.validateRegionSizeOnly_PDX(getTestMethodName() + "_RR", 2));
 
       vm3.invoke(() -> WANTestBase.closeCache());
     } finally {