You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/24 13:24:12 UTC
[02/50] [abbrv] ignite git commit: IGNITE-2079 GridCacheIoManager
eats exception trail if it falls into the directed case merger from
ignite-2079-2
IGNITE-2079 GridCacheIoManager eats exception trail if it falls into the directed case
merger from ignite-2079-2
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ddb8be1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ddb8be1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ddb8be1
Branch: refs/heads/ignite-4242
Commit: 9ddb8be1243df8e489f7ebc716d315415775439a
Parents: 4474046
Author: Dmitriy Govorukhin <dg...@gridgain.com>
Authored: Thu Oct 27 17:52:22 2016 +0300
Committer: Dmitriy Govorukhin <dg...@gridgain.com>
Committed: Thu Oct 27 17:52:22 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/events/EventType.java | 6 +
.../ignite/events/UnhandledExceptionEvent.java | 61 ++++
.../processors/cache/GridCacheIoManager.java | 70 +++--
.../cache/query/GridCacheQueryManager.java | 10 +
.../query/GridCacheQueryMetricsAdapter.java | 9 +-
.../cache/query/GridCacheQueryResponse.java | 2 +-
.../continuous/CacheContinuousQueryHandler.java | 50 +--
...2pUnmarshallingContinuousQueryErrorTest.java | 302 +++++++++++++++++++
...niteCacheP2pUnmarshallingErrorTestSuite.java | 6 +-
9 files changed, 455 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 103dbd4..7778f67 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -859,6 +859,12 @@ public interface EventType {
public static final int EVT_IGFS_FILE_PURGED = 127;
/**
+ * Built-in event type: event for unhandled exception.
+ *
+ */
+ public static final int EVT_UNHANDLED_EXCEPTION = 128;
+
+ /**
* All checkpoint events. This array can be directly passed into
* {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
* subscribe to all checkpoint events.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java b/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java
new file mode 100644
index 0000000..cb6cd85
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Cache fail event.
+ */
+public class UnhandledExceptionEvent extends EventAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private Exception ex;
+
+ /**
+ * Default constructor.
+ */
+ public UnhandledExceptionEvent() {
+ }
+
+ /**
+ * @param node Node.
+ * @param msg Message.
+ * @param ex Exception.
+ * @param type Type.
+ */
+ public UnhandledExceptionEvent(ClusterNode node, String msg, Exception ex, int type) {
+ super(node, msg, type);
+ this.ex = ex;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "msg=" + message() + ", type=" + type() + "ex=" + ex;
+ }
+
+ /**
+ *
+ * @return inner exception
+ */
+ public Exception getException() {
+ return ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 78dddd3..5d7cb00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,50 +17,26 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.UnhandledExceptionEvent;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -79,6 +55,12 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.ignite.events.EventType.EVT_UNHANDLED_EXCEPTION;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
/**
@@ -693,6 +675,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case 59:
+ // No additional actions required, just skipping default switch section,
+ // since UnhandledException already registered.
+ break;
+
case 114: {
processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
}
@@ -737,13 +724,34 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
- default:
- throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
- + msg + "]", msg.classError());
+ default:{
+ String shortMsg = "Failed to send response to node. Unsupported direct type [message=" + msg + "]";
+
+ IgniteCheckedException e = new IgniteCheckedException(shortMsg, msg.classError());
+
+ registerUnhandledException(ctx, shortMsg, e);
+ }
}
}
/**
+ * @param ctx Grid cache context.
+ * @param shortMsg Short message.
+ * @param ex Original Exception.
+ */
+ public static void registerUnhandledException(GridCacheContext ctx, String shortMsg, IgniteCheckedException ex) {
+ GridKernalContext kctx = ctx.kernalContext();
+
+ kctx.exceptionRegistry().onException(shortMsg, ex);
+
+ ClusterNode node = ctx.discovery().localNode();
+
+ UnhandledExceptionEvent evt = new UnhandledExceptionEvent(node, shortMsg, ex, EVT_UNHANDLED_EXCEPTION);
+
+ kctx.event().record(evt);
+ }
+
+ /**
* @param nodeId Node ID.
* @param msg Message.
* @param c Closure.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 7bd1a51..97e59c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -326,6 +326,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ * Increment fails counter.
+ */
+ public void onUnhandledException() {
+ final boolean statsEnabled = cctx.config().isStatisticsEnabled();
+
+ if (statsEnabled)
+ metrics.incrementOnFails();
+ }
+
+ /**
* Processes cache query request.
*
* @param sndId Sender node id.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
index e70ea9f..d25b7c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
@@ -172,4 +172,11 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl
@Override public String toString() {
return S.toString(GridCacheQueryMetricsAdapter.class, this);
}
-}
+
+ /**
+ * Increment fails counter.
+ */
+ public void incrementOnFails() {
+ fails.increment();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 8492c38..2b86efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -357,6 +357,6 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridCacheQueryResponse.class, this);
+ return S.toString(GridCacheQueryResponse.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 304d031..4c91ea7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -17,28 +17,6 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryEventFilter;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -61,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
@@ -82,8 +61,22 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheIoManager.registerUnhandledException;
/**
* Continuous query handler.
@@ -688,8 +681,17 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
catch (IgniteCheckedException ex) {
if (ignoreClsNotFound)
assert internal;
- else
- U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+ else {
+ String shortMsg = "Failed to unmarshal entry.";
+
+ U.error(ctx.log(getClass()), shortMsg, ex);
+
+ GridCacheQueryManager qryMgr = cctx.queries();
+
+ qryMgr.onUnhandledException();
+
+ registerUnhandledException(cctx, shortMsg, ex);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java
new file mode 100644
index 0000000..82f5f09
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.events.UnhandledExceptionEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Checks behavior on exception while unmarshalling key for continuous query.
+ */
+public class IgniteCacheP2pUnmarshallingContinuousQueryErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** Used inside InitialQuery listener. */
+ private static final CountDownLatch latch = new CountDownLatch(1);
+
+ /** Node where unmarshalling fails with exceptions. */
+ private static volatile String failNode;
+
+ /** Used to count UnhandledExceptionEvents at client node. */
+ private static final AtomicInteger cnt = new AtomicInteger();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration cacheCfg = super.cacheConfiguration(gridName);
+
+ cacheCfg.setStatisticsEnabled(true);
+
+ return cacheCfg;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void testResponseMessageOnUnmarshallingFailed() throws Exception {
+ IgniteEx client = grid(0);
+ IgniteEx node1 = grid(1);
+ IgniteEx node2 = grid(2);
+
+ assert client.configuration().isClientMode() &&
+ !node1.configuration().isClientMode() &&
+ !node2.configuration().isClientMode();
+
+ failNode = client.name();
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ UnhandledExceptionEvent uex = (UnhandledExceptionEvent)evt;
+
+ assertTrue(X.getFullStackTrace(uex.getException()).
+ contains("IOException: Class can not be unmarshalled"));
+
+ cnt.incrementAndGet();
+
+ return true;
+ }
+ }, EventType.EVT_UNHANDLED_EXCEPTION);
+
+ node1.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ fail("This line should newer calls.");
+
+ return true;
+ }
+ }, EventType.EVT_UNHANDLED_EXCEPTION);
+
+ ContinuousQuery<TestKey, String> qry = new ContinuousQuery<>();
+
+ qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<TestKey, String>() {
+ @Override public boolean apply(TestKey key, String val) {
+ latch.countDown(); // Gives guarantee query initialized.
+
+ return true;
+ }
+ }));
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<TestKey, String>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends TestKey, ? extends String>> evts) {
+ fail("This line should newer calls.");
+ }
+ });
+
+ validate(
+ 0,//execs
+ 0,//evts
+ 0,//fails
+ client,
+ node1,
+ node2);
+
+ // Put element before creating QueryCursor.
+ putPrimary(node1);
+
+ try (QueryCursor<Cache.Entry<TestKey, String>> cur = client.cache(null).query(qry)) {
+ latch.await();
+
+ validate(
+ 1,//execs
+ 0,//evts
+ 0,//fails
+ client,
+ node1,
+ node2);
+
+ putPrimary(node1);
+
+ validate(
+ 1,//execs
+ 1,//evts
+ 1,//fails
+ client,
+ node1,
+ node2);
+
+ putPrimary(node2);
+
+ validate(
+ 1,//execs
+ 2,//evts
+ 2,//fails
+ client,
+ node1,
+ node2);
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ private void putPrimary(IgniteEx ignite) {
+ IgniteCache<TestKey, Object> cache = ignite.cache(null);
+
+ cache.put(generateNodeKeys(ignite, cache), "value");
+ }
+
+ /**
+ * @param execs Executions.
+ * @param evts Events.
+ * @param failsNum Fails number.
+ * @param client Client.
+ * @param node1 Node 1.
+ * @param node2 Node 2.
+ */
+ private void validate(final int execs, final int evts, final int failsNum, final IgniteEx client, IgniteEx node1,
+ IgniteEx node2) throws Exception {
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return client.cache(null).queryMetrics().fails() == failsNum;
+ }
+ }, 5_000));
+
+ assertEquals(evts, cnt.intValue());
+
+ validateCacheQueryMetrics(client, execs, failsNum);
+ validateCacheQueryMetrics(node1, 0, 0);
+ validateCacheQueryMetrics(node2, 0, 0);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param executions Executions.
+ * @param fails Fails.
+ */
+ private void validateCacheQueryMetrics(IgniteEx ignite, int executions, int fails) {
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ GridCacheQueryMetricsAdapter metr = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
+
+ assertEquals(metr.executions(), executions);
+
+ assertEquals(metr.fails(), fails);
+ }
+
+ /**
+ * @param node Node.
+ * @param cache Cache.
+ */
+ private TestKey generateNodeKeys(IgniteEx node, IgniteCache<TestKey, Object> cache) {
+
+ ClusterNode locNode = node.localNode();
+
+ for (int ind = 0; ind < 100_000; ind++) {
+ TestKey key = new TestKey("key" + ind);
+
+ if (affinity(cache).isPrimary(locNode, key))
+ return key;
+ }
+
+ throw new IgniteException("Unable to find key keys as primary for cache.");
+ }
+
+ /**
+ *
+ * */
+ private static class TestKey implements Externalizable {
+ /**
+ * Field.
+ */
+ @QuerySqlField(index = true)
+ private String field;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public TestKey() {
+ }
+
+ /**
+ * @param field Test key 1.
+ */
+ public TestKey(String field) {
+ this.field = field;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.TestKey key = (IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.TestKey)o;
+
+ return !(field != null ? !field.equals(key.field) : key.field != null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public int hashCode() {
+ return field != null ? field.hashCode() : 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(field);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ field = (String)in.readObject();
+
+ if (((IgniteThread)Thread.currentThread()).getGridName().equals(failNode))
+ throw new IOException("Class can not be unmarshalled.");
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
index dfc96dc..b45d134 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
@@ -19,10 +19,7 @@ package org.apache.ignite.testsuites;
import java.util.Set;
import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingErrorTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingNearErrorTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingRebalanceErrorTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingTxErrorTest;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.testframework.GridTestUtils;
/**
@@ -49,6 +46,7 @@ public class IgniteCacheP2pUnmarshallingErrorTestSuite extends TestSuite {
GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingNearErrorTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingRebalanceErrorTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingTxErrorTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.class, ignoredTests);
return suite;
}