You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2021/05/04 14:40:33 UTC

[ignite] branch master updated: IGNITE-14669 Fixed NPE when node caused query deserialization error and left - Fixes #9067.

This is an automated email from the ASF dual-hosted git repository.

ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f26d676  IGNITE-14669 Fixed NPE when node caused query deserialization error and left - Fixes #9067.
f26d676 is described below

commit f26d67643de0e7c9d7791df051606a3c78dc6d25
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Tue May 4 17:39:32 2021 +0300

    IGNITE-14669 Fixed NPE when node caused query deserialization error and left - Fixes #9067.
---
 .../processors/cache/GridCacheIoManager.java       | 20 +++++++---
 .../apache/ignite/client/PersonBinarylizable.java  | 31 ++++++++++++++-
 .../internal/client/thin/CacheAsyncTest.java       |  5 ++-
 .../IgniteCacheContinuousQueryClientTest.java      | 45 ++++++++++++++++++++++
 4 files changed, 92 insertions(+), 9 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 8bbde01..80c681d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -930,12 +930,20 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     req.classError(),
                     cctx.deploymentEnabled());
 
-                cctx.io().sendOrderedMessage(
-                    cctx.node(nodeId),
-                    TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
-                    res,
-                    plc,
-                    Long.MAX_VALUE);
+                ClusterNode node = cctx.node(nodeId);
+
+                if (node == null) {
+                    U.error(log, "Failed to send message because node left grid [nodeId=" + nodeId +
+                        ", msg=" + msg + ']');
+                }
+                else {
+                    cctx.io().sendOrderedMessage(
+                        node,
+                        TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
+                        res,
+                        plc,
+                        Long.MAX_VALUE);
+                }
             }
 
             break;
diff --git a/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java b/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java
index cd0ff1f..28918b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java
@@ -21,6 +21,8 @@ import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.binary.BinaryReader;
 import org.apache.ignite.binary.BinaryWriter;
 import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * A person entity for the tests.
@@ -36,10 +38,14 @@ public class PersonBinarylizable implements Binarylizable {
     private boolean readThrows;
 
     /** */
-    public PersonBinarylizable(String name, boolean writeThrows, boolean readThrows) {
+    private boolean readWaits;
+
+    /** */
+    public PersonBinarylizable(String name, boolean writeThrows, boolean readThrows, boolean readWaits) {
         this.name = name;
         this.writeThrows = writeThrows;
         this.readThrows = readThrows;
+        this.readWaits = readWaits;
     }
 
     /** */
@@ -72,11 +78,22 @@ public class PersonBinarylizable implements Binarylizable {
         this.readThrows = readThrows;
     }
 
+    /** */
+    public boolean isReadWaits() {
+        return readWaits;
+    }
+
+    /** */
+    public void setReadWaits(boolean readWaits) {
+        this.readWaits = readWaits;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
         writer.writeString("name", name);
         writer.writeBoolean("writeThrows", writeThrows);
         writer.writeBoolean("readThrows", readThrows);
+        writer.writeBoolean("readWaits", readWaits);
 
         if (writeThrows)
             throw new ArithmeticException("_write_");
@@ -87,6 +104,18 @@ public class PersonBinarylizable implements Binarylizable {
         name = reader.readString("name");
         writeThrows = reader.readBoolean("writeThrows");
         readThrows = reader.readBoolean("readThrows");
+        readWaits = reader.readBoolean("readWaits");
+
+        if (readWaits) {
+            try {
+                U.sleep(1000);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                e.printStackTrace();
+
+                Thread.currentThread().interrupt();
+            }
+        }
 
         if (readThrows)
             throw new ArithmeticException("_read_");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java
index 83540c5..31920a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java
@@ -342,7 +342,7 @@ public class CacheAsyncTest extends AbstractThinClientTest {
     @Test
     public void testGetAsyncThrowsExceptionOnFailedDeserialization() throws Exception {
         ClientCache<Integer, PersonBinarylizable> cache = client.createCache(TMP_CACHE_NAME);
-        cache.put(1, new PersonBinarylizable("1", false, true));
+        cache.put(1, new PersonBinarylizable("1", false, true, false));
 
         Throwable t = cache.getAsync(1).handle((res, err) -> err).toCompletableFuture().get();
 
@@ -358,7 +358,8 @@ public class CacheAsyncTest extends AbstractThinClientTest {
     public void testPutAsyncThrowsExceptionOnFailedSerialization() {
         ClientCache<Integer, PersonBinarylizable> cache = client.createCache(TMP_CACHE_NAME);
 
-        IgniteClientFuture<Void> fut = cache.putAsync(1, new PersonBinarylizable("1", true, false));
+        IgniteClientFuture<Void> fut = cache.putAsync(1,
+            new PersonBinarylizable("1", true, false, false));
 
         GridTestUtils.assertThrowsAnyCause(null, fut::get, BinaryObjectException.class,
             "Failed to serialize object [typeName=org.apache.ignite.client.PersonBinarylizable]");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index 9c9ea57..6e0268b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
@@ -27,8 +28,12 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.client.PersonBinarylizable;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.resources.LoggerResource;
@@ -45,10 +50,21 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  *
  */
 public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest {
+    /** */
+    private AtomicBoolean failure = new AtomicBoolean(false);
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setFailureHandler(new AbstractFailureHandler() {
+            @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
+                failure.set(true);
+
+                return true;
+            }
+        });
+
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
         ccfg.setCacheMode(PARTITIONED);
@@ -243,6 +259,35 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
         tryClose(cur);
     }
 
+
+    /**
+     * Checks that deserialization error after client node leaves does not fail server node.
+     */
+    @Test
+    public void testFailedSerializationAfterNodeLeaves() throws Exception {
+        startGrids(1);
+
+        final int CLIENT_ID = 1;
+
+        Ignite clientNode = startClientGrid(CLIENT_ID);
+
+        IgniteCache<Integer, PersonBinarylizable> cache = clientNode.cache(DEFAULT_CACHE_NAME);
+
+        PersonBinarylizable bin = new PersonBinarylizable("1", false, true, true);
+
+        cache.query(new ScanQuery<>((k, v) -> !v.equals(bin)));
+
+        stopGrid(1);
+
+        Thread.sleep(1100);
+
+        assertNotNull(grid(0).cache(DEFAULT_CACHE_NAME));
+
+        assertFalse(failure.get());
+
+        stopGrid(0);
+    }
+
     /**
      * @param cur Cur.
      */