You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2021/05/04 14:40:33 UTC
[ignite] branch master updated: IGNITE-14669 Fixed NPE when node
caused query deserialization error and left - Fixes #9067.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f26d676 IGNITE-14669 Fixed NPE when node caused query deserialization error and left - Fixes #9067.
f26d676 is described below
commit f26d67643de0e7c9d7791df051606a3c78dc6d25
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Tue May 4 17:39:32 2021 +0300
IGNITE-14669 Fixed NPE when node caused query deserialization error and left - Fixes #9067.
---
.../processors/cache/GridCacheIoManager.java | 20 +++++++---
.../apache/ignite/client/PersonBinarylizable.java | 31 ++++++++++++++-
.../internal/client/thin/CacheAsyncTest.java | 5 ++-
.../IgniteCacheContinuousQueryClientTest.java | 45 ++++++++++++++++++++++
4 files changed, 92 insertions(+), 9 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 8bbde01..80c681d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -930,12 +930,20 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.classError(),
cctx.deploymentEnabled());
- cctx.io().sendOrderedMessage(
- cctx.node(nodeId),
- TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
- res,
- plc,
- Long.MAX_VALUE);
+ ClusterNode node = cctx.node(nodeId);
+
+ if (node == null) {
+ U.error(log, "Failed to send message because node left grid [nodeId=" + nodeId +
+ ", msg=" + msg + ']');
+ }
+ else {
+ cctx.io().sendOrderedMessage(
+ node,
+ TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
+ res,
+ plc,
+ Long.MAX_VALUE);
+ }
}
break;
diff --git a/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java b/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java
index cd0ff1f..28918b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/PersonBinarylizable.java
@@ -21,6 +21,8 @@ import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryReader;
import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* A person entity for the tests.
@@ -36,10 +38,14 @@ public class PersonBinarylizable implements Binarylizable {
private boolean readThrows;
/** */
- public PersonBinarylizable(String name, boolean writeThrows, boolean readThrows) {
+ private boolean readWaits;
+
+ /** */
+ public PersonBinarylizable(String name, boolean writeThrows, boolean readThrows, boolean readWaits) {
this.name = name;
this.writeThrows = writeThrows;
this.readThrows = readThrows;
+ this.readWaits = readWaits;
}
/** */
@@ -72,11 +78,22 @@ public class PersonBinarylizable implements Binarylizable {
this.readThrows = readThrows;
}
+ /** */
+ public boolean isReadWaits() {
+ return readWaits;
+ }
+
+ /** */
+ public void setReadWaits(boolean readWaits) {
+ this.readWaits = readWaits;
+ }
+
/** {@inheritDoc} */
@Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
writer.writeString("name", name);
writer.writeBoolean("writeThrows", writeThrows);
writer.writeBoolean("readThrows", readThrows);
+ writer.writeBoolean("readWaits", readWaits);
if (writeThrows)
throw new ArithmeticException("_write_");
@@ -87,6 +104,18 @@ public class PersonBinarylizable implements Binarylizable {
name = reader.readString("name");
writeThrows = reader.readBoolean("writeThrows");
readThrows = reader.readBoolean("readThrows");
+ readWaits = reader.readBoolean("readWaits");
+
+ if (readWaits) {
+ try {
+ U.sleep(1000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+
+ Thread.currentThread().interrupt();
+ }
+ }
if (readThrows)
throw new ArithmeticException("_read_");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java
index 83540c5..31920a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheAsyncTest.java
@@ -342,7 +342,7 @@ public class CacheAsyncTest extends AbstractThinClientTest {
@Test
public void testGetAsyncThrowsExceptionOnFailedDeserialization() throws Exception {
ClientCache<Integer, PersonBinarylizable> cache = client.createCache(TMP_CACHE_NAME);
- cache.put(1, new PersonBinarylizable("1", false, true));
+ cache.put(1, new PersonBinarylizable("1", false, true, false));
Throwable t = cache.getAsync(1).handle((res, err) -> err).toCompletableFuture().get();
@@ -358,7 +358,8 @@ public class CacheAsyncTest extends AbstractThinClientTest {
public void testPutAsyncThrowsExceptionOnFailedSerialization() {
ClientCache<Integer, PersonBinarylizable> cache = client.createCache(TMP_CACHE_NAME);
- IgniteClientFuture<Void> fut = cache.putAsync(1, new PersonBinarylizable("1", true, false));
+ IgniteClientFuture<Void> fut = cache.putAsync(1,
+ new PersonBinarylizable("1", true, false, false));
GridTestUtils.assertThrowsAnyCause(null, fut::get, BinaryObjectException.class,
"Failed to serialize object [typeName=org.apache.ignite.client.PersonBinarylizable]");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index 9c9ea57..6e0268b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
@@ -27,8 +28,12 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.client.PersonBinarylizable;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.resources.LoggerResource;
@@ -45,10 +50,21 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
*
*/
public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest {
+ /** */
+ private AtomicBoolean failure = new AtomicBoolean(false);
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setFailureHandler(new AbstractFailureHandler() {
+ @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
+ failure.set(true);
+
+ return true;
+ }
+ });
+
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(PARTITIONED);
@@ -243,6 +259,35 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
tryClose(cur);
}
+
+ /**
+ * Checks that deserialization error after client node leaves does not fail server node.
+ */
+ @Test
+ public void testFailedSerializationAfterNodeLeaves() throws Exception {
+ startGrids(1);
+
+ final int CLIENT_ID = 1;
+
+ Ignite clientNode = startClientGrid(CLIENT_ID);
+
+ IgniteCache<Integer, PersonBinarylizable> cache = clientNode.cache(DEFAULT_CACHE_NAME);
+
+ PersonBinarylizable bin = new PersonBinarylizable("1", false, true, true);
+
+ cache.query(new ScanQuery<>((k, v) -> !v.equals(bin)));
+
+ stopGrid(1);
+
+ Thread.sleep(1100);
+
+ assertNotNull(grid(0).cache(DEFAULT_CACHE_NAME));
+
+ assertFalse(failure.get());
+
+ stopGrid(0);
+ }
+
/**
* @param cur Cur.
*/