You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/04 18:30:14 UTC

[02/14] incubator-geode git commit: GEODE-967: Changed to serialize substitute value only if necessary

GEODE-967: Changed to serialize substitute value only if necessary


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d067f41d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d067f41d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d067f41d

Branch: refs/heads/feature/GEODE-949-2
Commit: d067f41d5411e54f1fa83caca6c3494182b6fe78
Parents: aa92530
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Thu Feb 25 16:56:12 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Tue Mar 1 10:13:05 2016 -0800

----------------------------------------------------------------------
 .../cache/wan/GatewaySenderEventImpl.java       | 38 +++++++--
 .../cache/wan/AsyncEventQueueTestBase.java      | 86 +++++++++++++++++++-
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 21 ++++-
 3 files changed, 132 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
index e19d7bf..0e506f7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
@@ -518,6 +518,9 @@ public class GatewaySenderEventImpl implements
     }
     Object rawValue = this.value;
     if (rawValue == null) {
+      rawValue = this.substituteValue;
+    }
+    if (rawValue == null) {
       @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
       Object vo = this.valueObj;
       if (vo instanceof StoredObject) {
@@ -548,6 +551,8 @@ public class GatewaySenderEventImpl implements
     @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
     Object result = this.value;
     if (result == null) {
+      result = this.substituteValue;
+      if (result == null) {
       result = this.valueObj;
       if (result instanceof ObjectChunk) {
         if (this.valueObjReleased) {
@@ -562,6 +567,7 @@ public class GatewaySenderEventImpl implements
           }
         }
       }
+      }
     }
     return result;
   }
@@ -582,7 +588,6 @@ public class GatewaySenderEventImpl implements
    * @return this event's deserialized value
    */
   public Object getDeserializedValue() {
-// TODO OFFHEAP MERGE: handle substituteValue here?
     if (this.valueIsObject == 0x00) {
       Object result = this.value;
       if (result == null) {
@@ -616,6 +621,9 @@ public class GatewaySenderEventImpl implements
           Object result = EntryEventImpl.deserialize(this.value);
           this.valueObj = result;
           return result;
+        } else if (this.substituteValue != null) {
+          // If the substitute value is set, return it.
+          return this.substituteValue;
         } else {
           if (this.valueObjReleased) {
             throw new IllegalStateException("Value is no longer available. getDeserializedValue must be called before processEvents returns.");
@@ -633,8 +641,10 @@ public class GatewaySenderEventImpl implements
    * the value. This is a debugging exception.
    */
   public String getValueAsString(boolean deserialize) {
-// TODO OFFHEAP MERGE: handle substituteValue here?
     Object v = this.value;
+    if (v == null) {
+      v = this.substituteValue;
+    }
     if (deserialize) {
       try {
         v = getDeserializedValue();
@@ -672,6 +682,13 @@ public class GatewaySenderEventImpl implements
   public byte[] getSerializedValue() {
     byte[] result = this.value;
     if (result == null) {
+      if (this.substituteValue != null) {
+        // The substitute value is set. Serialize it
+        isSerializingValue.set(Boolean.TRUE);
+        result = EntryEventImpl.serialize(this.substituteValue);
+        isSerializingValue.set(Boolean.FALSE);
+        return result;
+      }
       @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
       Object vo = this.valueObj;
       if (vo instanceof StoredObject) {
@@ -687,7 +704,9 @@ public class GatewaySenderEventImpl implements
         synchronized (this) {
           result = this.value;
           if (result == null && vo != null && !(vo instanceof Token)) {
+            isSerializingValue.set(Boolean.TRUE);
             result = EntryEventImpl.serialize(vo);
+            isSerializingValue.set(Boolean.FALSE);
             this.value = result;
           } else if (result == null) {
             if (this.valueObjReleased) {
@@ -982,15 +1001,12 @@ public class GatewaySenderEventImpl implements
         this.value = (byte[]) this.substituteValue;
         this.valueIsObject = 0x00;
       } else if (this.substituteValue == TOKEN_NULL) {
-        // The substituteValue represents null. Set the value to null.
+        // The substituteValue represents null. Set the value and substituteValue to null.
         this.value = null;
+        this.substituteValue = null;
         this.valueIsObject = 0x01;
       } else {
-        // The substituteValue is an object. Serialize it.
-        isSerializingValue.set(Boolean.TRUE);
-        this.value = CacheServerHelper.serialize(this.substituteValue);
-        isSerializingValue.set(Boolean.FALSE);
-        event.setCachedSerializedNewValue(this.value);
+        // The substituteValue is an object. Leave it as is.
         this.valueIsObject = 0x01;
       }
     }
@@ -1219,7 +1235,11 @@ public class GatewaySenderEventImpl implements
     if (vo instanceof StoredObject) {
       return ((StoredObject) vo).getSizeInBytes();
     } else {
+      if (this.substituteValue != null) {
+        return sizeOf(this.substituteValue);
+      } else {
       return CachedDeserializableFactory.calcMemSize(getSerializedValue());
+      }
     }
   }
   
@@ -1247,7 +1267,7 @@ public class GatewaySenderEventImpl implements
    * If it was stored off-heap and is no longer available (because it was released) then return null.
    */
   public GatewaySenderEventImpl makeHeapCopyIfOffHeap() {
-    if (this.value != null) {
+    if (this.value != null || this.substituteValue != null) {
       // we have the value stored on the heap so return this
       return this;
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index 93e91c9..d834017 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -16,6 +16,8 @@
  */
 package com.gemstone.gemfire.internal.cache.wan;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -34,6 +36,8 @@ import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheClosedException;
@@ -72,6 +76,7 @@ import com.gemstone.gemfire.internal.cache.ForceReattemptException;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
 import com.gemstone.gemfire.test.dunit.Assert;
 import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 import com.gemstone.gemfire.test.dunit.IgnoredException;
@@ -1375,20 +1380,20 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
     }
   }
 
-  public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int numInvocations) {
+  public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int expectedNumInvocations) {
     AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
     assertNotNull(queue);
 
     // Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times
     MyGatewayEventSubstitutionFilter filter = (MyGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
     assertNotNull(filter);
-    assertEquals(numInvocations, filter.getNumInvocations());
+    assertEquals(expectedNumInvocations, filter.getNumInvocations());
 
     // Verify the AsyncEventListener has received the substituted values
     MyAsyncEventListener listener = (MyAsyncEventListener) queue.getAsyncEventListener();
     final Map eventsMap = listener.getEventsMap();
     assertNotNull(eventsMap);
-    assertEquals(numInvocations, eventsMap.size());
+    assertEquals(expectedNumInvocations, eventsMap.size());
 
     for (Iterator i = eventsMap.entrySet().iterator(); i.hasNext();) {
       Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>) i.next();
@@ -1396,6 +1401,17 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
     }
   }
 
+
+  public static void verifySubstitutionFilterToDataInvocations(String asyncEventQueueId, int expectedToDataInvoations) {
+    AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
+    assertNotNull(queue);
+
+    // Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times
+    SizeableGatewayEventSubstitutionFilter filter = (SizeableGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
+    assertNotNull(filter);
+    assertEquals(expectedToDataInvoations, filter.getNumToDataInvocations());
+  }
+
   public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
     AsyncEventListener theListener = null;
 
@@ -1649,6 +1665,70 @@ class MyCacheLoader implements CacheLoader, Declarable {
 
 }
 
+class SizeableGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
+
+  private AtomicInteger numToDataInvocations = new AtomicInteger();
+
+  protected static final String SUBSTITUTION_PREFIX = "substituted_";
+
+  public Object getSubstituteValue(EntryEvent event) {
+    return new GatewayEventSubstituteObject(this, SUBSTITUTION_PREFIX + event.getKey());
+  }
+
+  public void close() {
+  }
+
+  public void init(Properties properties) {
+  }
+
+  protected void incNumToDataInvocations() {
+    this.numToDataInvocations.incrementAndGet();
+  }
+
+  protected int getNumToDataInvocations() {
+    return this.numToDataInvocations.get();
+  }
+}
+
+class GatewayEventSubstituteObject implements DataSerializable, Sizeable {
+
+  private String id;
+
+  private SizeableGatewayEventSubstitutionFilter filter;
+
+  public GatewayEventSubstituteObject(SizeableGatewayEventSubstitutionFilter filter, String id) {
+    this.filter = filter;
+    this.id = id;
+  }
+
+  public String getId() {
+    return this.id;
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    this.filter.incNumToDataInvocations();
+    DataSerializer.writeString(this.id, out);
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.id = DataSerializer.readString(in);
+  }
+
+  public int getSizeInBytes() {
+    return 0;
+  }
+
+  public String toString() {
+    return new StringBuilder()
+        .append(getClass().getSimpleName())
+        .append("[")
+        .append("id=")
+        .append(this.id)
+        .append("]")
+        .toString();
+  }
+}
+
 class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
 
   private AtomicInteger numInvocations = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 59b2caa..779fc75 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -943,7 +943,26 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
 
     vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
 
-    vm4.invoke(() -> verifySubstitutionFilterInvocations( "ln" , numPuts ));
+    vm4.invoke(() -> verifySubstitutionFilterInvocations( "ln" ,numPuts ));
+  }
+
+  public void testParallelAsyncEventQueueWithSubstitutionFilterNoSubstituteValueToDataInvocations() {
+    Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
+
+    vm4.invoke(createCacheRunnable(lnPort));
+
+    vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
+        true, 100, 100, false, false, null, false, "MyAsyncEventListener", "SizeableGatewayEventSubstitutionFilter" ));
+
+    String regionName = getTestMethodName() + "_PR";
+    vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( regionName, "ln", isOffHeap() ));
+
+    int numPuts = 10;
+    vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( regionName, numPuts ));
+
+    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+
+    vm4.invoke(() -> verifySubstitutionFilterToDataInvocations( "ln" ,0 ));
   }
 
   /**