You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/30 08:11:59 UTC

[07/10] ignite git commit: ignite-1745 GridCacheIoManager should send correct response in case of cache query error

ignite-1745 GridCacheIoManager should send correct response in case of cache query error


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7bedc8ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7bedc8ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7bedc8ac

Branch: refs/heads/ignite-1758
Commit: 7bedc8aceefb94d1bd87620887a8a44025518abe
Parents: 3ec52f3
Author: agura <ag...@gridgain.com>
Authored: Tue Oct 27 18:22:42 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Thu Oct 29 15:36:15 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    | 28 +++++++++++++++--
 ...niteCacheP2pUnmarshallingQueryErrorTest.java | 32 +++++++++++++++++++-
 2 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7bedc8ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index ec34f41..082f330 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -53,6 +53,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -73,6 +75,9 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
  * Cache communication manager.
  */
 public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
+    /** Communication topic prefix for distributed queries. */
+    private static final String QUERY_TOPIC_PREFIX = "QUERY";
+
     /** Message ID generator. */
     private static final AtomicLong idGen = new AtomicLong();
 
@@ -304,8 +309,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     /**
      * Processes failed messages.
      *
-     * @param nodeId niode id.
-     * @param msg message.
+     * @param nodeId Node ID.
+     * @param msg Message.
      * @throws IgniteCheckedException If failed.
      */
     private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException {
@@ -493,6 +498,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case 58: {
+                GridCacheQueryRequest req = (GridCacheQueryRequest)msg;
+
+                GridCacheQueryResponse res = new GridCacheQueryResponse(
+                    req.cacheId(),
+                    req.id(),
+                    req.classError(),
+                    cctx.deploymentEnabled());
+
+                cctx.io().sendOrderedMessage(
+                    ctx.node(nodeId),
+                    TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
+                    res,
+                    ctx.ioPolicy(),
+                    Long.MAX_VALUE);
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bedc8ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
index 6a4ba3a..07fa2bc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
@@ -17,9 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import javax.cache.CacheException;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteBiPredicate;
 
 /**
  * Checks behavior on exception while unmarshalling key.
@@ -47,10 +51,36 @@ public class IgniteCacheP2pUnmarshallingQueryErrorTest extends IgniteCacheP2pUnm
         try {
             jcache(0).query(new SqlQuery<TestKey, String>(String.class, "field like '" + key + "'")).getAll();
 
-            assert false : "p2p marshalling failed, but error response was not sent";
+            fail("p2p marshalling failed, but error response was not sent");
         }
         catch (CacheException e) {
             // No-op
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testResponseMessageOnRequestUnmarshallingFailed() throws Exception {
+        readCnt.set(Integer.MAX_VALUE);
+
+        jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+        try {
+            jcache().query(new ScanQuery<>(new IgniteBiPredicate<TestKey, String>() {
+                @Override public boolean apply(TestKey key, String val) {
+                    return false;
+                }
+
+                private void readObject(ObjectInputStream is) throws IOException {
+                    throw new IOException();
+                }
+            })).getAll();
+
+            fail("Request unmarshalling failed, but error response was not sent.");
+        }
+        catch (Exception e) {
+            // No-op.
+        }
+    }
 }
\ No newline at end of file