You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/04 18:30:14 UTC
[02/14] incubator-geode git commit: GEODE-967: Changed to serialize
substitute value only if necessary
GEODE-967: Changed to serialize substitute value only if necessary
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d067f41d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d067f41d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d067f41d
Branch: refs/heads/feature/GEODE-949-2
Commit: d067f41d5411e54f1fa83caca6c3494182b6fe78
Parents: aa92530
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Thu Feb 25 16:56:12 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Tue Mar 1 10:13:05 2016 -0800
----------------------------------------------------------------------
.../cache/wan/GatewaySenderEventImpl.java | 38 +++++++--
.../cache/wan/AsyncEventQueueTestBase.java | 86 +++++++++++++++++++-
.../asyncqueue/AsyncEventListenerDUnitTest.java | 21 ++++-
3 files changed, 132 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
index e19d7bf..0e506f7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java
@@ -518,6 +518,9 @@ public class GatewaySenderEventImpl implements
}
Object rawValue = this.value;
if (rawValue == null) {
+ rawValue = this.substituteValue;
+ }
+ if (rawValue == null) {
@Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
Object vo = this.valueObj;
if (vo instanceof StoredObject) {
@@ -548,6 +551,8 @@ public class GatewaySenderEventImpl implements
@Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
Object result = this.value;
if (result == null) {
+ result = this.substituteValue;
+ if (result == null) {
result = this.valueObj;
if (result instanceof ObjectChunk) {
if (this.valueObjReleased) {
@@ -562,6 +567,7 @@ public class GatewaySenderEventImpl implements
}
}
}
+ }
}
return result;
}
@@ -582,7 +588,6 @@ public class GatewaySenderEventImpl implements
* @return this event's deserialized value
*/
public Object getDeserializedValue() {
-// TODO OFFHEAP MERGE: handle substituteValue here?
if (this.valueIsObject == 0x00) {
Object result = this.value;
if (result == null) {
@@ -616,6 +621,9 @@ public class GatewaySenderEventImpl implements
Object result = EntryEventImpl.deserialize(this.value);
this.valueObj = result;
return result;
+ } else if (this.substituteValue != null) {
+ // If the substitute value is set, return it.
+ return this.substituteValue;
} else {
if (this.valueObjReleased) {
throw new IllegalStateException("Value is no longer available. getDeserializedValue must be called before processEvents returns.");
@@ -633,8 +641,10 @@ public class GatewaySenderEventImpl implements
* the value. This is a debugging exception.
*/
public String getValueAsString(boolean deserialize) {
-// TODO OFFHEAP MERGE: handle substituteValue here?
Object v = this.value;
+ if (v == null) {
+ v = this.substituteValue;
+ }
if (deserialize) {
try {
v = getDeserializedValue();
@@ -672,6 +682,13 @@ public class GatewaySenderEventImpl implements
public byte[] getSerializedValue() {
byte[] result = this.value;
if (result == null) {
+ if (this.substituteValue != null) {
+ // The substitute value is set. Serialize it
+ isSerializingValue.set(Boolean.TRUE);
+ result = EntryEventImpl.serialize(this.substituteValue);
+ isSerializingValue.set(Boolean.FALSE);
+ return result;
+ }
@Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
Object vo = this.valueObj;
if (vo instanceof StoredObject) {
@@ -687,7 +704,9 @@ public class GatewaySenderEventImpl implements
synchronized (this) {
result = this.value;
if (result == null && vo != null && !(vo instanceof Token)) {
+ isSerializingValue.set(Boolean.TRUE);
result = EntryEventImpl.serialize(vo);
+ isSerializingValue.set(Boolean.FALSE);
this.value = result;
} else if (result == null) {
if (this.valueObjReleased) {
@@ -982,15 +1001,12 @@ public class GatewaySenderEventImpl implements
this.value = (byte[]) this.substituteValue;
this.valueIsObject = 0x00;
} else if (this.substituteValue == TOKEN_NULL) {
- // The substituteValue represents null. Set the value to null.
+ // The substituteValue represents null. Set the value and substituteValue to null.
this.value = null;
+ this.substituteValue = null;
this.valueIsObject = 0x01;
} else {
- // The substituteValue is an object. Serialize it.
- isSerializingValue.set(Boolean.TRUE);
- this.value = CacheServerHelper.serialize(this.substituteValue);
- isSerializingValue.set(Boolean.FALSE);
- event.setCachedSerializedNewValue(this.value);
+ // The substituteValue is an object. Leave it as is.
this.valueIsObject = 0x01;
}
}
@@ -1219,7 +1235,11 @@ public class GatewaySenderEventImpl implements
if (vo instanceof StoredObject) {
return ((StoredObject) vo).getSizeInBytes();
} else {
+ if (this.substituteValue != null) {
+ return sizeOf(this.substituteValue);
+ } else {
return CachedDeserializableFactory.calcMemSize(getSerializedValue());
+ }
}
}
@@ -1247,7 +1267,7 @@ public class GatewaySenderEventImpl implements
* If it was stored off-heap and is no longer available (because it was released) then return null.
*/
public GatewaySenderEventImpl makeHeapCopyIfOffHeap() {
- if (this.value != null) {
+ if (this.value != null || this.substituteValue != null) {
// we have the value stored on the heap so return this
return this;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index 93e91c9..d834017 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -16,6 +16,8 @@
*/
package com.gemstone.gemfire.internal.cache.wan;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -34,6 +36,8 @@ import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
@@ -72,6 +76,7 @@ import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
import com.gemstone.gemfire.test.dunit.Assert;
import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.IgnoredException;
@@ -1375,20 +1380,20 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
}
}
- public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int numInvocations) {
+ public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int expectedNumInvocations) {
AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
assertNotNull(queue);
// Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times
MyGatewayEventSubstitutionFilter filter = (MyGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
assertNotNull(filter);
- assertEquals(numInvocations, filter.getNumInvocations());
+ assertEquals(expectedNumInvocations, filter.getNumInvocations());
// Verify the AsyncEventListener has received the substituted values
MyAsyncEventListener listener = (MyAsyncEventListener) queue.getAsyncEventListener();
final Map eventsMap = listener.getEventsMap();
assertNotNull(eventsMap);
- assertEquals(numInvocations, eventsMap.size());
+ assertEquals(expectedNumInvocations, eventsMap.size());
for (Iterator i = eventsMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>) i.next();
@@ -1396,6 +1401,17 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
}
}
+
+ public static void verifySubstitutionFilterToDataInvocations(String asyncEventQueueId, int expectedToDataInvoations) {
+ AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
+ assertNotNull(queue);
+
+ // Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times
+ SizeableGatewayEventSubstitutionFilter filter = (SizeableGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
+ assertNotNull(filter);
+ assertEquals(expectedToDataInvoations, filter.getNumToDataInvocations());
+ }
+
public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
AsyncEventListener theListener = null;
@@ -1649,6 +1665,70 @@ class MyCacheLoader implements CacheLoader, Declarable {
}
+class SizeableGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
+
+ private AtomicInteger numToDataInvocations = new AtomicInteger();
+
+ protected static final String SUBSTITUTION_PREFIX = "substituted_";
+
+ public Object getSubstituteValue(EntryEvent event) {
+ return new GatewayEventSubstituteObject(this, SUBSTITUTION_PREFIX + event.getKey());
+ }
+
+ public void close() {
+ }
+
+ public void init(Properties properties) {
+ }
+
+ protected void incNumToDataInvocations() {
+ this.numToDataInvocations.incrementAndGet();
+ }
+
+ protected int getNumToDataInvocations() {
+ return this.numToDataInvocations.get();
+ }
+}
+
+class GatewayEventSubstituteObject implements DataSerializable, Sizeable {
+
+ private String id;
+
+ private SizeableGatewayEventSubstitutionFilter filter;
+
+ public GatewayEventSubstituteObject(SizeableGatewayEventSubstitutionFilter filter, String id) {
+ this.filter = filter;
+ this.id = id;
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public void toData(DataOutput out) throws IOException {
+ this.filter.incNumToDataInvocations();
+ DataSerializer.writeString(this.id, out);
+ }
+
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.id = DataSerializer.readString(in);
+ }
+
+ public int getSizeInBytes() {
+ return 0;
+ }
+
+ public String toString() {
+ return new StringBuilder()
+ .append(getClass().getSimpleName())
+ .append("[")
+ .append("id=")
+ .append(this.id)
+ .append("]")
+ .toString();
+ }
+}
+
class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
private AtomicInteger numInvocations = new AtomicInteger();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 59b2caa..779fc75 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -943,7 +943,26 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
- vm4.invoke(() -> verifySubstitutionFilterInvocations( "ln" , numPuts ));
+ vm4.invoke(() -> verifySubstitutionFilterInvocations( "ln" ,numPuts ));
+ }
+
+ public void testParallelAsyncEventQueueWithSubstitutionFilterNoSubstituteValueToDataInvocations() {
+ Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
+
+ vm4.invoke(createCacheRunnable(lnPort));
+
+ vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
+ true, 100, 100, false, false, null, false, "MyAsyncEventListener", "SizeableGatewayEventSubstitutionFilter" ));
+
+ String regionName = getTestMethodName() + "_PR";
+ vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( regionName, "ln", isOffHeap() ));
+
+ int numPuts = 10;
+ vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( regionName, numPuts ));
+
+ vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+
+ vm4.invoke(() -> verifySubstitutionFilterToDataInvocations( "ln" ,0 ));
}
/**