You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2019/02/15 22:27:36 UTC

[geode] branch develop updated: GEODE-6391: Adding the event ID to the messages (#3184)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b7da1f5  GEODE-6391: Adding the event ID to the messages (#3184)
b7da1f5 is described below

commit b7da1f57abbde2ded2b6bbcb2459e269a9c5477a
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Fri Feb 15 14:27:21 2019 -0800

    GEODE-6391: Adding the event ID to the messages (#3184)
    
            * Event id is included in the DestroyPartitionedRegionMessage and InvalidatePartitionedRegionMessage
    	* This is to prevent the NPE
    	* The NPE occurs when server tries to notify the clients after receiving the message
    	* While getting the thread id and sequence id it ends up with an NPE and event id is null.
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |  14 ++-
 .../cache/DestroyPartitionedRegionMessage.java     |  33 +++++-
 .../cache/InvalidatePartitionedRegionMessage.java  |  38 +++++--
 .../geode/internal/cache/RegionEventImpl.java      |   3 -
 .../DestroyPartitionedRegionMessageDUnitTest.java  | 124 +++++++++++++++++++++
 ...nvalidatePartitionedRegionMessageDUnitTest.java | 115 +++++++++++++++++++
 6 files changed, 311 insertions(+), 16 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 3117ff4..97798d0 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -895,9 +895,11 @@ org/apache/geode/internal/cache/DestroyOperation$DestroyWithContextMessage,2
 fromData,14
 toData,14
 
-org/apache/geode/internal/cache/DestroyPartitionedRegionMessage,2
-fromData,76
-toData,77
+org/apache/geode/internal/cache/DestroyPartitionedRegionMessage,4
+fromData,17
+fromDataPre_GEODE_1_9_0_0,76
+toData,14
+toDataPre_GEODE_1_9_0_0,77
 
 org/apache/geode/internal/cache/DestroyRegionOperation$DestroyRegionMessage,2
 fromData,41
@@ -1115,9 +1117,11 @@ org/apache/geode/internal/cache/InvalidateOperation$InvalidateWithContextMessage
 fromData,14
 toData,14
 
-org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage,2
-fromData,14
+org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage,4
+fromData,17
+fromDataPre_GEODE_1_9_0_0,14
 toData,14
+toDataPre_GEODE_1_9_0_0,14
 
 org/apache/geode/internal/cache/InvalidateRegionOperation$InvalidateRegionMessage,2
 fromData,17
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
index 5fbe876..65c0fa3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyPartitionedRegionMessage.java
@@ -33,6 +33,7 @@ import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
@@ -72,6 +73,15 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
   /** Serial numbers of the buckets for this region */
   private int bucketSerials[];
 
+  /** Event ID of the destroy operation created at the origin */
+  private EventID eventID;
+
+  @Override
+  public EventID getEventID() {
+    return eventID;
+  }
+
+
   /**
    * Empty constructor to satisfy {@link DataSerializer} requirements
    */
@@ -92,6 +102,7 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
     this.prSerial = region.getSerialNumber();
     Assert.assertTrue(this.prSerial != DistributionAdvisor.ILLEGAL_SERIAL);
     this.bucketSerials = serials;
+    this.eventID = event.getEventId();
   }
 
   /**
@@ -177,7 +188,8 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
       logger.trace(LogMarker.DM_VERBOSE, "{} operateOnRegion: {}", getClass().getName(),
           r.getFullPath());
     }
-    RegionEventImpl event = new RegionEventImpl(r, this.op, this.cbArg, true, r.getMyId());
+    RegionEventImpl event =
+        new RegionEventImpl(r, this.op, this.cbArg, true, r.getMyId(), getEventID());
     r.basicDestroyRegion(event, false, false, true);
 
     return true;
@@ -203,7 +215,19 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
   }
 
   @Override
+  public Version[] getSerializationVersions() {
+    return new Version[] {Version.GEODE_190};
+  }
+
+
+  @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    fromDataPre_GEODE_1_9_0_0(in);
+    this.eventID = DataSerializer.readObject(in);
+
+  }
+
+  public void fromDataPre_GEODE_1_9_0_0(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
     this.cbArg = DataSerializer.readObject(in);
     this.op = Operation.fromOrdinal(in.readByte());
@@ -217,6 +241,11 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
 
   @Override
   public void toData(DataOutput out) throws IOException {
+    toDataPre_GEODE_1_9_0_0(out);
+    DataSerializer.writeObject(this.eventID, out);
+  }
+
+  public void toDataPre_GEODE_1_9_0_0(DataOutput out) throws IOException {
     super.toData(out);
     DataSerializer.writeObject(this.cbArg, out);
     out.writeByte(this.op.ordinal);
@@ -227,6 +256,8 @@ public class DestroyPartitionedRegionMessage extends PartitionMessage {
     }
   }
 
+
+
   /**
    * The response on which to wait for all the replies. This response ignores any exceptions
    * received from the "far side"
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
index 991f302..aabacae 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InvalidatePartitionedRegionMessage.java
@@ -25,24 +25,33 @@ import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
 
 public class InvalidatePartitionedRegionMessage extends PartitionMessage {
 
   private Object callbackArg;
 
+  @Override
+  public EventID getEventID() {
+    return eventID;
+  }
+
+  private EventID eventID;
+
   public InvalidatePartitionedRegionMessage() {}
 
   public InvalidatePartitionedRegionMessage(Set recipients, Object callbackArg, PartitionedRegion r,
-      ReplyProcessor21 processor) {
+      ReplyProcessor21 processor, EventID eventID) {
     super(recipients, r.getPRId(), processor);
     this.callbackArg = callbackArg;
+    this.eventID = eventID;
   }
 
   public static ReplyProcessor21 send(Set recipients, PartitionedRegion r, RegionEventImpl event) {
     ReplyProcessor21 response = new ReplyProcessor21(r.getSystem(), recipients);
     InvalidatePartitionedRegionMessage msg = new InvalidatePartitionedRegionMessage(recipients,
-        event.getCallbackArgument(), r, response);
+        event.getCallbackArgument(), r, response, event.getEventId());
     msg.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
     r.getSystem().getDistributionManager().putOutgoing(msg);
     return response;
@@ -62,7 +71,7 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
       throws CacheException, QueryException, ForceReattemptException, InterruptedException {
 
     RegionEventImpl event = new RegionEventImpl(pr, Operation.REGION_INVALIDATE, this.callbackArg,
-        !dm.getId().equals(getSender()), getSender());
+        !dm.getId().equals(getSender()), getSender(), getEventID());
     pr.basicInvalidateRegion(event);
     return true;
   }
@@ -77,6 +86,11 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
     return INVALIDATE_PARTITIONED_REGION_MESSAGE;
   }
 
+  public void fromDataPre_GEODE_1_9_0_0(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.callbackArg = DataSerializer.readObject(in);
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -84,8 +98,13 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
    */
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    super.fromData(in);
-    this.callbackArg = DataSerializer.readObject(in);
+    fromDataPre_GEODE_1_9_0_0(in);
+    this.eventID = DataSerializer.readObject(in);
+  }
+
+  public void toDataPre_GEODE_1_9_0_0(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeObject(this.callbackArg, out);
   }
 
   /*
@@ -95,7 +114,12 @@ public class InvalidatePartitionedRegionMessage extends PartitionMessage {
    */
   @Override
   public void toData(DataOutput out) throws IOException {
-    super.toData(out);
-    DataSerializer.writeObject(this.callbackArg, out);
+    toDataPre_GEODE_1_9_0_0(out);
+    DataSerializer.writeObject(this.eventID, out);
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return new Version[] {Version.GEODE_190};
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
index 9485b64..fc339cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
@@ -26,7 +26,6 @@ import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.DSFIDFactory;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.InternalDataSerializer;
@@ -105,8 +104,6 @@ public class RegionEventImpl
     this.callbackArgument = callbackArgument;
     this.originRemote = originRemote;
     this.distributedMember = distributedMember;
-    // TODO:ASIF: Remove this Assert from production env.
-    Assert.assertTrue(eventID != null);
     this.eventId = eventID;
   }
 
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/DestroyPartitionedRegionMessageDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/DestroyPartitionedRegionMessageDUnitTest.java
new file mode 100644
index 0000000..d2eed8d
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/DestroyPartitionedRegionMessageDUnitTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.VMProvider;
+
+@RunWith(JUnitParamsRunner.class)
+@Category(ClientSubscriptionTest.class)
+public class DestroyPartitionedRegionMessageDUnitTest implements Serializable {
+  private MemberVM server1, server2;
+  private TestCqListener testListener;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Before
+  public void before() throws Exception {
+    MemberVM locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    Integer locator1Port = locator.getPort();
+    server1 = clusterStartupRule.startServerVM(1, locator1Port);
+    server2 = clusterStartupRule.startServerVM(2, locator1Port);
+
+    VMProvider.invokeInEveryMember(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      cache.createRegionFactory(RegionShortcut.PARTITION).create("region");
+    }, server1, server2);
+
+    ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+    clientCacheFactory.addPoolServer("localhost", server1.getPort());
+    clientCacheFactory.setPoolSubscriptionEnabled(true);
+    ClientCache clientCache = clientCacheFactory.create();
+    clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+
+    QueryService queryService = clientCache.getQueryService();
+    CqAttributesFactory cqaf = new CqAttributesFactory();
+    testListener = new TestCqListener();
+    cqaf.addCqListener(testListener);
+    CqAttributes cqAttributes = cqaf.create();
+
+    queryService.newCq("Select * from /region r where r.ID + 3 > 4", cqAttributes).execute();
+  }
+
+  @Test
+  @Parameters({"1", "2"})
+  @TestCaseName("[{index}] {method}: server{params}")
+  public void closeMethodShouldBeCalledWhenRegionIsDestroyed(int serverIndex) {
+    // The test is run twice with destroy being invoked in each server
+    clusterStartupRule.getMember(serverIndex).invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      Region<Integer, Portfolio> regionOnServer = cache.getRegion("region");
+      regionOnServer.destroyRegion();
+    });
+
+    // Wait until region destroy operation has been distributed.
+    VMProvider.invokeInEveryMember(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      await().until(() -> cache.getRegion("region") == null);
+    }, server1, server2);
+
+    assertThat(testListener.closeInvoked.get()).isTrue();
+  }
+
+  private class TestCqListener implements CqListener, Serializable {
+    AtomicBoolean closeInvoked = new AtomicBoolean();
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {}
+
+    @Override
+    public void onError(CqEvent aCqEvent) {}
+
+    @Override
+    public void close() {
+      closeInvoked.set(true);
+    }
+  }
+}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/InvalidatePartitionedRegionMessageDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/InvalidatePartitionedRegionMessageDUnitTest.java
new file mode 100644
index 0000000..0207c75
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/InvalidatePartitionedRegionMessageDUnitTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.VMProvider;
+
+@RunWith(JUnitParamsRunner.class)
+@Category(ClientSubscriptionTest.class)
+public class InvalidatePartitionedRegionMessageDUnitTest implements Serializable {
+  private MemberVM server1, server2;
+  private TestCqListener testListener;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Before
+  public void before() throws Exception {
+    MemberVM locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    Integer locator1Port = locator.getPort();
+    server1 = clusterStartupRule.startServerVM(1, locator1Port);
+    server2 = clusterStartupRule.startServerVM(2, locator1Port);
+
+    VMProvider.invokeInEveryMember(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      cache.createRegionFactory(RegionShortcut.PARTITION).create("region");
+    }, server1, server2);
+
+    ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+    clientCacheFactory.addPoolServer("localhost", server1.getPort());
+    clientCacheFactory.setPoolSubscriptionEnabled(true);
+    ClientCache clientCache = clientCacheFactory.create();
+    clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+
+    QueryService queryService = clientCache.getQueryService();
+    CqAttributesFactory cqaf = new CqAttributesFactory();
+    testListener = new TestCqListener();
+    cqaf.addCqListener(testListener);
+    CqAttributes cqAttributes = cqaf.create();
+
+    queryService.newCq("Select * from /region r where r.ID + 3 > 4", cqAttributes).execute();
+  }
+
+  @Test
+  @Parameters({"1", "2"})
+  @TestCaseName("[{index}] {method}: server{params}")
+  public void closeMethodShouldBeCalledWhenRegionIsDestroyed(int serverIndex) {
+    // The test is run twice with destroy being invoked in each server
+    clusterStartupRule.getMember(serverIndex).invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      Region<Integer, Portfolio> regionOnServer = cache.getRegion("region");
+      regionOnServer.invalidateRegion();
+    });
+
+  }
+
+  private class TestCqListener implements CqListener, Serializable {
+    AtomicBoolean closeInvoked = new AtomicBoolean();
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {}
+
+    @Override
+    public void onError(CqEvent aCqEvent) {}
+
+    @Override
+    public void close() {
+      closeInvoked.set(true);
+    }
+  }
+}