You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/08 11:21:05 UTC

[02/50] [abbrv] incubator-ignite git commit: ignite-646

ignite-646


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a47974c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a47974c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a47974c3

Branch: refs/heads/ignite-gg-10186
Commit: a47974c3444da2f1804ca1c0b80cd74f92cdf137
Parents: 7a8e075
Author: avinogradov <av...@gridgain.com>
Authored: Tue Apr 21 16:33:13 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue Apr 21 16:33:13 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    | 160 ++++++++++++-------
 .../distributed/dht/GridDhtCacheAdapter.java    |  19 ---
 .../dht/atomic/GridDhtAtomicCache.java          |  31 +---
 .../IgniteCacheP2pUnmarshallingErrorTest.java   |  46 +++---
 4 files changed, 131 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 56ee65e..0df824f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -24,6 +24,8 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -32,7 +34,6 @@ import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
-import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -226,68 +227,66 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             unmarshall(nodeId, cacheMsg);
 
-            if (cacheMsg.allowForStartup())
-                processMessage(nodeId, cacheMsg, c);
+            if (cacheMsg.classError() != null)
+                processFailedMessage(nodeId, cacheMsg);
             else {
-                IgniteInternalFuture<?> startFut = startFuture(cacheMsg);
-
-                if (startFut.isDone())
+                if (cacheMsg.allowForStartup())
                     processMessage(nodeId, cacheMsg, c);
                 else {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for start future to complete for message [nodeId=" + nodeId +
-                            ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
-
-                    // Don't hold this thread waiting for preloading to complete.
-                    startFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(final IgniteInternalFuture<?> f) {
-                            cctx.kernalContext().closure().runLocalSafe(
-                                new GridPlainRunnable() {
-                                    @Override public void run() {
-                                        rw.readLock();
-
-                                        try {
-                                            if (stopping) {
-                                                if (log.isDebugEnabled())
-                                                    log.debug("Received cache communication message while stopping " +
-                                                        "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
+                    IgniteInternalFuture<?> startFut = startFuture(cacheMsg);
 
-                                                return;
-                                            }
+                    if (startFut.isDone())
+                        processMessage(nodeId, cacheMsg, c);
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Waiting for start future to complete for message [nodeId=" + nodeId +
+                                ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
 
-                                            f.get();
+                        // Don't hold this thread waiting for preloading to complete.
+                        startFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(final IgniteInternalFuture<?> f) {
+                                cctx.kernalContext().closure().runLocalSafe(
+                                    new GridPlainRunnable() {
+                                        @Override public void run() {
+                                            rw.readLock();
 
-                                            if (log.isDebugEnabled())
-                                                log.debug("Start future completed for message [nodeId=" + nodeId +
-                                                    ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
+                                            try {
+                                                if (stopping) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Received cache communication message while stopping " +
+                                                            "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
 
-                                            processMessage(nodeId, cacheMsg, c);
-                                        }
-                                        catch (IgniteCheckedException e) {
-                                            // Log once.
-                                            if (startErr.compareAndSet(false, true))
-                                                U.error(log, "Failed to complete preload start future " +
-                                                    "(will ignore message) " +
-                                                    "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e);
-                                        }
-                                        finally {
-                                            rw.readUnlock();
+                                                    return;
+                                                }
+
+                                                f.get();
+
+                                                if (log.isDebugEnabled())
+                                                    log.debug("Start future completed for message [nodeId=" + nodeId +
+                                                        ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
+
+                                                processMessage(nodeId, cacheMsg, c);
+                                            }
+                                            catch (IgniteCheckedException e) {
+                                                // Log once.
+                                                if (startErr.compareAndSet(false, true))
+                                                    U.error(log, "Failed to complete preload start future " +
+                                                        "(will ignore message) " +
+                                                        "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e);
+                                            }
+                                            finally {
+                                                rw.readUnlock();
+                                            }
                                         }
                                     }
-                                }
-                            );
-                        }
-                    });
+                                );
+                            }
+                        });
+                    }
                 }
             }
         }
         catch (Throwable e) {
-//            if (X.hasCause(e, ClassNotFoundException.class))
-//                U.error(log, "Failed to process message (note that distributed services " +
-//                    "do not support peer class loading, if you deploy distributed service " +
-//                    "you should have all required classes in CLASSPATH on all nodes in topology) " +
-//                    "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
-//            else
             U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e);
         }
         finally {
@@ -298,6 +297,61 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         }
     }
 
+    private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheContext ctx) {
+        try {
+            ctx.io().send(nodeId, res, ctx.ioPolicy());
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
+                ",res=" + res + ']', e);
+        }
+    }
+
+    private void processFailedMessage(UUID nodeId, GridCacheMessage msg) {
+        GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
+
+        switch (msg.directType()) {
+            case 38: {
+                GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
+
+                GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion());
+
+                res.onError(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, ctx);
+            }
+
+            break;
+            case 40: {
+                GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg;
+
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+                    nodeId,
+                    req.futureVersion());
+
+                res.onError(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, ctx);
+            }
+
+            break;
+            case 49: {
+                GridNearGetRequest req = (GridNearGetRequest)msg;
+
+                GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.version());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, ctx);
+            }
+
+            break;
+        }
+    }
+
     /**
      * @param cacheMsg Cache message to get start future.
      * @return Preloader start future.
@@ -738,11 +792,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
         }
         catch (IgniteCheckedException e) {
-//            if (cacheMsg.ignoreClassErrors() && X.hasCause(e, InvalidClassException.class,
-//                    ClassNotFoundException.class, NoClassDefFoundError.class, UnsupportedClassVersionError.class))
-                cacheMsg.onClassError(e);
-//            else
-//                throw e;
+            cacheMsg.onClassError(e);
         }
         catch (Error e) {
             if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index c6ebe0a..d85bc75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -606,25 +606,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
         assert ctx.affinityNode();
 
-        if (req.classError() != null) {
-            GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(),
-                req.futureId(),
-                req.miniId(),
-                req.version());
-
-            res.error(req.classError());
-
-            try {
-                ctx.io().send(nodeId, res, ctx.ioPolicy());
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
-                    ",req=" + req + ", res=" + res + ']', e);
-            }
-
-            return;
-        }
-
         long ttl = req.accessTtl();
 
         final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ec9e5a6..85f11b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2329,18 +2329,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         req.nodeId(ctx.localNodeId());
 
-        if (req.classError() != null) {
-            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
-                nodeId,
-                req.futureVersion());
-
-            res.onError(req.classError());
-
-            sendNearUpdateReply(nodeId, res);
-        }
-        else {
-            updateAllAsyncInternal(nodeId, req, updateReplyClos);
-        }
+        updateAllAsyncInternal(nodeId, req, updateReplyClos);
     }
 
     /**
@@ -2376,24 +2365,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         // Always send update reply.
         GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion());
 
-        if (req.classError() != null) {
-            res.onError(req.classError());
-
-            try {
-                ctx.io().send(nodeId, res, ctx.ioPolicy());
-            }
-            catch (ClusterTopologyCheckedException ignored) {
-                U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
-                    req.nodeId());
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +
-                    ", req=" + req + ']', e);
-            }
-
-            return;
-        }
-
         Boolean replicate = ctx.isDrEnabled();
 
         boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index d21c219..60f2226 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -26,11 +26,11 @@ import java.io.*;
 import java.util.concurrent.atomic.*;
 
 /**
- * Check behavior on exception while unmarshalling key
+ * Check behavior on exception while unmarshalling key.
  */
 public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest {
-    /** Allows to change behavior of readExternal method */
-    private static AtomicInteger nodeCnt = new AtomicInteger();
+    /** Allows to change behavior of readExternal method. */
+    private static AtomicInteger readCnt = new AtomicInteger();
 
     /** {@inheritDoc} */
     @Override protected int gridCount() {
@@ -67,18 +67,18 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
         return cfg;
     }
 
-    /** Test key 1 */
+    /** Test key 1. */
     public static class TestKey implements Externalizable {
-        /** Test key 1 */
+        /** Test key 1. */
         public TestKey(String field) {
             this.field = field;
         }
 
-        /** Test key 1 */
+        /** Test key 1. */
         public TestKey() {
         }
 
-        /** field */
+        /** field. */
         private String field;
 
         /** {@inheritDoc} */
@@ -105,29 +105,28 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            if (nodeCnt.decrementAndGet() < 1) { //will throw exception on backup node only
+            if (readCnt.decrementAndGet() <= 0) { //will throw exception on backup node only
                 throw new IOException("Class can not be unmarshalled");
             }
         }
     }
 
     /**
-     * Test key 2.
-     * Unmarshalling always failed.
+     * Test key 2. Unmarshalling always failed.
      */
     public static class TestKeyAlwaysFailed extends TestKey {
-        /** Test key 2 */
+        /** Test key 2. */
         public TestKeyAlwaysFailed(String field) {
             super(field);
         }
 
-        /** Test key 2 */
+        /** Test key 2. */
         public TestKeyAlwaysFailed() {
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            nodeCnt.decrementAndGet();
+            readCnt.decrementAndGet();
             throw new IOException("Class can not be unmarshalled"); //will throw exception on primary node
         }
 
@@ -138,20 +137,23 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
      */
     public void testResponseMessageOnUnmarshallingFailed() {
 
-        nodeCnt.set(1);
+        //Checking failed unmarshalling on primary node.
+        readCnt.set(1);
 
         try {
-            jcache(0).put(new TestKeyAlwaysFailed("1"), "");
+            jcache(0).put(new TestKeyAlwaysFailed("1"), ""); //put will fail at primary node.
+
             assert false : "p2p marshalling failed, but error response was not sent";
         }
         catch (CacheException e) {
             assert X.hasCause(e, IOException.class);
         }
 
-        assert nodeCnt.get() == 0;//put request should not go to backup node in case failed at primary.
+        assert readCnt.get() == 0; //put request should not be handled by backup node in case failed at primary.
 
         try {
             assert jcache(0).get(new TestKeyAlwaysFailed("1")) == null;
+
             assert false : "p2p marshalling failed, but error response was not sent";
         }
         catch (CacheException e) {
@@ -160,20 +162,22 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
 
         assert grid(0).cachex().entrySet().size() == 0;
 
-        nodeCnt.set(2); //put request will be unmarshalled twice (at primary and at backup node).
+        //Checking failed unmarshalling on backup node.
+        readCnt.set(2); //put request will be unmarshalled twice (at primary and at backup node).
 
         try {
-            jcache(0).put(new TestKey("1"), "");//put will fail at backup node.
+            jcache(0).put(new TestKey("1"), ""); //put will fail at backup node only.
+
             assert false : "p2p marshalling failed, but error response was not sent";
         }
         catch (CacheException e) {
             assert X.hasCause(e, IOException.class);
         }
 
-        assert nodeCnt.get() == 0;//put request should go to primary and backup node.
+        assert readCnt.get() == 0; //put request should be handled by primary and backup node.
 
-        // Need to have to exception while unmarshalling getKeyResponse.
-        nodeCnt.set(3); //get response will me unmarshalled twice (request at primary node and response at client).
+        // Need to have no exception while unmarshalling getKeyResponse.
+        readCnt.set(3); //get response will me unmarshalled twice (request at primary node and response at client).
 
         assert jcache(0).get(new TestKey("1")) == null;