You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2019/02/15 22:27:36 UTC
[geode] branch develop updated: GEODE-6391: Adding the event ID to
the messages (#3184)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new b7da1f5 GEODE-6391: Adding the event ID to the messages (#3184)
b7da1f5 is described below
commit b7da1f57abbde2ded2b6bbcb2459e269a9c5477a
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Fri Feb 15 14:27:21 2019 -0800
GEODE-6391: Adding the event ID to the messages (#3184)
* Event id is included in the DestroyPartitionedRegionMessage and InvalidatePartitionedRegionMessage
* This is to prevent the NPE
* The NPE occurs when server tries to notify the clients after receiving the message
* While getting the thread id and sequence id it ends up with an NPE and event id is null.
---
.../codeAnalysis/sanctionedDataSerializables.txt | 14 ++-
.../cache/DestroyPartitionedRegionMessage.java | 33 +++++-
.../cache/InvalidatePartitionedRegionMessage.java | 38 +++++--
.../geode/internal/cache/RegionEventImpl.java | 3 -
.../DestroyPartitionedRegionMessageDUnitTest.java | 124 +++++++++++++++++++++
...nvalidatePartitionedRegionMessageDUnitTest.java | 115 +++++++++++++++++++
6 files changed, 311 insertions(+), 16 deletions(-)
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 3117ff4..97798d0 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -895,9 +895,11 @@ org/apache/geode/internal/cache/DestroyOperation$DestroyWithContextMessage,2
fromData,14
toData,14
-org/apache/geode/internal/cache/DestroyPartitionedRegionMessage,2
-fromData,76
-toData,77
+org/apache/geode/internal/cache/DestroyPartitionedRegionMessage,4
+fromData,17
+fromDataPre_GEODE_1_9_0_0,76
+toData,14
+toDataPre_GEODE_1_9_0_0,77
org/apache/geode/internal/cache/DestroyRegionOperation$DestroyRegionMessage,2
fromData,41
@@ -1115,9 +1117,11 @@ org/apache/geode/internal/cache/InvalidateOperation$InvalidateWithContextMessage
fromData,14
toData,14
-org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage,2
-fromData,14
+org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage,4
+fromData,17
+fromDataPre_GEODE_1_9_0_0,14
toData,14
+toDataPre_GEODE_1_9_0_0,14
org/apache/geode/internal/cache/InvalidateRegionOperation$InvalidateRegionMessage,2
fromData,17
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
index 5fbe876..65c0fa3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
@@ -33,6 +33,7 @@ import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
@@ -72,6 +73,15 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
/** Serial numbers of the buckets for this region */
private int bucketSerials[];
+ /** Event ID of the destroy operation created at the origin */
+ private EventID eventID;
+
+ @Override
+ public EventID getEventID() {
+ return eventID;
+ }
+
+
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
*/
@@ -92,6 +102,7 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
this.prSerial = region.getSerialNumber();
Assert.assertTrue(this.prSerial != DistributionAdvisor.ILLEGAL_SERIAL);
this.bucketSerials = serials;
+ this.eventID = event.getEventId();
}
/**
@@ -177,7 +188,8 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
logger.trace(LogMarker.DM_VERBOSE, "{} operateOnRegion: {}", getClass().getName(),
r.getFullPath());
}
- RegionEventImpl event = new RegionEventImpl(r, this.op, this.cbArg, true, r.getMyId());
+ RegionEventImpl event =
+ new RegionEventImpl(r, this.op, this.cbArg, true, r.getMyId(), getEventID());
r.basicDestroyRegion(event, false, false, true);
return true;
@@ -203,7 +215,19 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
}
@Override
+ public Version[] getSerializationVersions() {
+ return new Version[] {Version.GEODE_190};
+ }
+
+
+ @Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ fromDataPre_GEODE_1_9_0_0(in);
+ this.eventID = DataSerializer.readObject(in);
+
+ }
+
+ public void fromDataPre_GEODE_1_9_0_0(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.cbArg = DataSerializer.readObject(in);
this.op = Operation.fromOrdinal(in.readByte());
@@ -217,6 +241,11 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
@Override
public void toData(DataOutput out) throws IOException {
+ toDataPre_GEODE_1_9_0_0(out);
+ DataSerializer.writeObject(this.eventID, out);
+ }
+
+ public void toDataPre_GEODE_1_9_0_0(DataOutput out) throws IOException {
super.toData(out);
DataSerializer.writeObject(this.cbArg, out);
out.writeByte(this.op.ordinal);
@@ -227,6 +256,8 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
}
}
+
+
/**
* The response on which to wait for all the replies. This response ignores any exceptions
* received from the "far side"
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
index 991f302..aabacae 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
@@ -25,24 +25,33 @@ import org.apache.geode.cache.Operation;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
public class InvalidatePartitionedRegionMessage extends PartitionMessage {
private Object callbackArg;
+ @Override
+ public EventID getEventID() {
+ return eventID;
+ }
+
+ private EventID eventID;
+
public InvalidatePartitionedRegionMessage() {}
public InvalidatePartitionedRegionMessage(Set recipients, Object callbackArg, PartitionedRegion r,
- ReplyProcessor21 processor) {
+ ReplyProcessor21 processor, EventID eventID) {
super(recipients, r.getPRId(), processor);
this.callbackArg = callbackArg;
+ this.eventID = eventID;
}
public static ReplyProcessor21 send(Set recipients, PartitionedRegion r, RegionEventImpl event) {
ReplyProcessor21 response = new ReplyProcessor21(r.getSystem(), recipients);
InvalidatePartitionedRegionMessage msg = new InvalidatePartitionedRegionMessage(recipients,
- event.getCallbackArgument(), r, response);
+ event.getCallbackArgument(), r, response, event.getEventId());
msg.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
r.getSystem().getDistributionManager().putOutgoing(msg);
return response;
@@ -62,7 +71,7 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
throws CacheException, QueryException, ForceReattemptException, InterruptedException {
RegionEventImpl event = new RegionEventImpl(pr, Operation.REGION_INVALIDATE, this.callbackArg,
- !dm.getId().equals(getSender()), getSender());
+ !dm.getId().equals(getSender()), getSender(), getEventID());
pr.basicInvalidateRegion(event);
return true;
}
@@ -77,6 +86,11 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
return INVALIDATE_PARTITIONED_REGION_MESSAGE;
}
+ public void fromDataPre_GEODE_1_9_0_0(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.callbackArg = DataSerializer.readObject(in);
+ }
+
/*
* (non-Javadoc)
*
@@ -84,8 +98,13 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
*/
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.callbackArg = DataSerializer.readObject(in);
+ fromDataPre_GEODE_1_9_0_0(in);
+ this.eventID = DataSerializer.readObject(in);
+ }
+
+ public void toDataPre_GEODE_1_9_0_0(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeObject(this.callbackArg, out);
}
/*
@@ -95,7 +114,12 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
*/
@Override
public void toData(DataOutput out) throws IOException {
- super.toData(out);
- DataSerializer.writeObject(this.callbackArg, out);
+ toDataPre_GEODE_1_9_0_0(out);
+ DataSerializer.writeObject(this.eventID, out);
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return new Version[] {Version.GEODE_190};
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
index 9485b64..fc339cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
@@ -26,7 +26,6 @@ import org.apache.geode.cache.RegionEvent;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DSFIDFactory;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.InternalDataSerializer;
@@ -105,8 +104,6 @@ public class RegionEventImpl
this.callbackArgument = callbackArgument;
this.originRemote = originRemote;
this.distributedMember = distributedMember;
- // TODO:ASIF: Remove this Assert from production env.
- Assert.assertTrue(eventID != null);
this.eventId = eventID;
}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/DestroyPartitionedRegionMessageDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/DestroyPartitionedRegionMessageDUnitTest.java
new file mode 100644
index 0000000..d2eed8d
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/DestroyPartitionedRegionMessageDUnitTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.VMProvider;
+
+@RunWith(JUnitParamsRunner.class)
+@Category(ClientSubscriptionTest.class)
+public class DestroyPartitionedRegionMessageDUnitTest implements Serializable {
+ private MemberVM server1, server2;
+ private TestCqListener testListener;
+
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+ @Before
+ public void before() throws Exception {
+ MemberVM locator = clusterStartupRule.startLocatorVM(0, new Properties());
+ Integer locator1Port = locator.getPort();
+ server1 = clusterStartupRule.startServerVM(1, locator1Port);
+ server2 = clusterStartupRule.startServerVM(2, locator1Port);
+
+ VMProvider.invokeInEveryMember(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ cache.createRegionFactory(RegionShortcut.PARTITION).create("region");
+ }, server1, server2);
+
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.addPoolServer("localhost", server1.getPort());
+ clientCacheFactory.setPoolSubscriptionEnabled(true);
+ ClientCache clientCache = clientCacheFactory.create();
+ clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+
+ QueryService queryService = clientCache.getQueryService();
+ CqAttributesFactory cqaf = new CqAttributesFactory();
+ testListener = new TestCqListener();
+ cqaf.addCqListener(testListener);
+ CqAttributes cqAttributes = cqaf.create();
+
+ queryService.newCq("Select * from /region r where r.ID + 3 > 4", cqAttributes).execute();
+ }
+
+ @Test
+ @Parameters({"1", "2"})
+ @TestCaseName("[{index}] {method}: server{params}")
+ public void closeMethodShouldBeCalledWhenRegionIsDestroyed(int serverIndex) {
+ // The test is run twice with destroy being invoked in each server
+ clusterStartupRule.getMember(serverIndex).invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ Region<Integer, Portfolio> regionOnServer = cache.getRegion("region");
+ regionOnServer.destroyRegion();
+ });
+
+ // Wait until region destroy operation has been distributed.
+ VMProvider.invokeInEveryMember(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ await().until(() -> cache.getRegion("region") == null);
+ }, server1, server2);
+
+ assertThat(testListener.closeInvoked.get()).isTrue();
+ }
+
+ private class TestCqListener implements CqListener, Serializable {
+ AtomicBoolean closeInvoked = new AtomicBoolean();
+
+ @Override
+ public void onEvent(CqEvent aCqEvent) {}
+
+ @Override
+ public void onError(CqEvent aCqEvent) {}
+
+ @Override
+ public void close() {
+ closeInvoked.set(true);
+ }
+ }
+}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/InvalidatePartitionedRegionMessageDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/InvalidatePartitionedRegionMessageDUnitTest.java
new file mode 100644
index 0000000..0207c75
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/InvalidatePartitionedRegionMessageDUnitTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.VMProvider;
+
+@RunWith(JUnitParamsRunner.class)
+@Category(ClientSubscriptionTest.class)
+public class InvalidatePartitionedRegionMessageDUnitTest implements Serializable {
+ private MemberVM server1, server2;
+ private TestCqListener testListener;
+
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+ @Before
+ public void before() throws Exception {
+ MemberVM locator = clusterStartupRule.startLocatorVM(0, new Properties());
+ Integer locator1Port = locator.getPort();
+ server1 = clusterStartupRule.startServerVM(1, locator1Port);
+ server2 = clusterStartupRule.startServerVM(2, locator1Port);
+
+ VMProvider.invokeInEveryMember(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ cache.createRegionFactory(RegionShortcut.PARTITION).create("region");
+ }, server1, server2);
+
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.addPoolServer("localhost", server1.getPort());
+ clientCacheFactory.setPoolSubscriptionEnabled(true);
+ ClientCache clientCache = clientCacheFactory.create();
+ clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+
+ QueryService queryService = clientCache.getQueryService();
+ CqAttributesFactory cqaf = new CqAttributesFactory();
+ testListener = new TestCqListener();
+ cqaf.addCqListener(testListener);
+ CqAttributes cqAttributes = cqaf.create();
+
+ queryService.newCq("Select * from /region r where r.ID + 3 > 4", cqAttributes).execute();
+ }
+
+ @Test
+ @Parameters({"1", "2"})
+ @TestCaseName("[{index}] {method}: server{params}")
+ public void closeMethodShouldBeCalledWhenRegionIsDestroyed(int serverIndex) {
+ // The test is run twice with destroy being invoked in each server
+ clusterStartupRule.getMember(serverIndex).invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ Region<Integer, Portfolio> regionOnServer = cache.getRegion("region");
+ regionOnServer.invalidateRegion();
+ });
+
+ }
+
+ private class TestCqListener implements CqListener, Serializable {
+ AtomicBoolean closeInvoked = new AtomicBoolean();
+
+ @Override
+ public void onEvent(CqEvent aCqEvent) {}
+
+ @Override
+ public void onError(CqEvent aCqEvent) {}
+
+ @Override
+ public void close() {
+ closeInvoked.set(true);
+ }
+ }
+}