You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/06 14:07:02 UTC
[03/36] incubator-ignite git commit: ignite-646
ignite-646
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a47974c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a47974c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a47974c3
Branch: refs/heads/ignite-695
Commit: a47974c3444da2f1804ca1c0b80cd74f92cdf137
Parents: 7a8e075
Author: avinogradov <av...@gridgain.com>
Authored: Tue Apr 21 16:33:13 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue Apr 21 16:33:13 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 160 ++++++++++++-------
.../distributed/dht/GridDhtCacheAdapter.java | 19 ---
.../dht/atomic/GridDhtAtomicCache.java | 31 +---
.../IgniteCacheP2pUnmarshallingErrorTest.java | 46 +++---
4 files changed, 131 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 56ee65e..0df824f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -24,6 +24,8 @@ import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -32,7 +34,6 @@ import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
-import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -226,68 +227,66 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
unmarshall(nodeId, cacheMsg);
- if (cacheMsg.allowForStartup())
- processMessage(nodeId, cacheMsg, c);
+ if (cacheMsg.classError() != null)
+ processFailedMessage(nodeId, cacheMsg);
else {
- IgniteInternalFuture<?> startFut = startFuture(cacheMsg);
-
- if (startFut.isDone())
+ if (cacheMsg.allowForStartup())
processMessage(nodeId, cacheMsg, c);
else {
- if (log.isDebugEnabled())
- log.debug("Waiting for start future to complete for message [nodeId=" + nodeId +
- ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
-
- // Don't hold this thread waiting for preloading to complete.
- startFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(final IgniteInternalFuture<?> f) {
- cctx.kernalContext().closure().runLocalSafe(
- new GridPlainRunnable() {
- @Override public void run() {
- rw.readLock();
-
- try {
- if (stopping) {
- if (log.isDebugEnabled())
- log.debug("Received cache communication message while stopping " +
- "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
+ IgniteInternalFuture<?> startFut = startFuture(cacheMsg);
- return;
- }
+ if (startFut.isDone())
+ processMessage(nodeId, cacheMsg, c);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for start future to complete for message [nodeId=" + nodeId +
+ ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
- f.get();
+ // Don't hold this thread waiting for preloading to complete.
+ startFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(final IgniteInternalFuture<?> f) {
+ cctx.kernalContext().closure().runLocalSafe(
+ new GridPlainRunnable() {
+ @Override public void run() {
+ rw.readLock();
- if (log.isDebugEnabled())
- log.debug("Start future completed for message [nodeId=" + nodeId +
- ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
+ try {
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received cache communication message while stopping " +
+ "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
- processMessage(nodeId, cacheMsg, c);
- }
- catch (IgniteCheckedException e) {
- // Log once.
- if (startErr.compareAndSet(false, true))
- U.error(log, "Failed to complete preload start future " +
- "(will ignore message) " +
- "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e);
- }
- finally {
- rw.readUnlock();
+ return;
+ }
+
+ f.get();
+
+ if (log.isDebugEnabled())
+ log.debug("Start future completed for message [nodeId=" + nodeId +
+ ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
+
+ processMessage(nodeId, cacheMsg, c);
+ }
+ catch (IgniteCheckedException e) {
+ // Log once.
+ if (startErr.compareAndSet(false, true))
+ U.error(log, "Failed to complete preload start future " +
+ "(will ignore message) " +
+ "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e);
+ }
+ finally {
+ rw.readUnlock();
+ }
}
}
- }
- );
- }
- });
+ );
+ }
+ });
+ }
}
}
}
catch (Throwable e) {
-// if (X.hasCause(e, ClassNotFoundException.class))
-// U.error(log, "Failed to process message (note that distributed services " +
-// "do not support peer class loading, if you deploy distributed service " +
-// "you should have all required classes in CLASSPATH on all nodes in topology) " +
-// "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
-// else
U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e);
}
finally {
@@ -298,6 +297,61 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
}
+ private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheContext ctx) {
+ try {
+ ctx.io().send(nodeId, res, ctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
+ ",res=" + res + ']', e);
+ }
+ }
+
+ private void processFailedMessage(UUID nodeId, GridCacheMessage msg) {
+ GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
+
+ switch (msg.directType()) {
+ case 38: {
+ GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
+
+ GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion());
+
+ res.onError(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, ctx);
+ }
+
+ break;
+ case 40: {
+ GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg;
+
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ nodeId,
+ req.futureVersion());
+
+ res.onError(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, ctx);
+ }
+
+ break;
+ case 49: {
+ GridNearGetRequest req = (GridNearGetRequest)msg;
+
+ GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(),
+ req.futureId(),
+ req.miniId(),
+ req.version());
+
+ res.error(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, ctx);
+ }
+
+ break;
+ }
+ }
+
/**
* @param cacheMsg Cache message to get start future.
* @return Preloader start future.
@@ -738,11 +792,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
}
catch (IgniteCheckedException e) {
-// if (cacheMsg.ignoreClassErrors() && X.hasCause(e, InvalidClassException.class,
-// ClassNotFoundException.class, NoClassDefFoundError.class, UnsupportedClassVersionError.class))
- cacheMsg.onClassError(e);
-// else
-// throw e;
+ cacheMsg.onClassError(e);
}
catch (Error e) {
if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index c6ebe0a..d85bc75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -606,25 +606,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
assert ctx.affinityNode();
- if (req.classError() != null) {
- GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version());
-
- res.error(req.classError());
-
- try {
- ctx.io().send(nodeId, res, ctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
- ",req=" + req + ", res=" + res + ']', e);
- }
-
- return;
- }
-
long ttl = req.accessTtl();
final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ec9e5a6..85f11b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2329,18 +2329,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.nodeId(ctx.localNodeId());
- if (req.classError() != null) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
- nodeId,
- req.futureVersion());
-
- res.onError(req.classError());
-
- sendNearUpdateReply(nodeId, res);
- }
- else {
- updateAllAsyncInternal(nodeId, req, updateReplyClos);
- }
+ updateAllAsyncInternal(nodeId, req, updateReplyClos);
}
/**
@@ -2376,24 +2365,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Always send update reply.
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion());
- if (req.classError() != null) {
- res.onError(req.classError());
-
- try {
- ctx.io().send(nodeId, res, ctx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
- req.nodeId());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +
- ", req=" + req + ']', e);
- }
-
- return;
- }
-
Boolean replicate = ctx.isDrEnabled();
boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index d21c219..60f2226 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -26,11 +26,11 @@ import java.io.*;
import java.util.concurrent.atomic.*;
/**
- * Check behavior on exception while unmarshalling key
+ * Check behavior on exception while unmarshalling key.
*/
public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest {
- /** Allows to change behavior of readExternal method */
- private static AtomicInteger nodeCnt = new AtomicInteger();
+ /** Allows to change behavior of readExternal method. */
+ private static AtomicInteger readCnt = new AtomicInteger();
/** {@inheritDoc} */
@Override protected int gridCount() {
@@ -67,18 +67,18 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
return cfg;
}
- /** Test key 1 */
+ /** Test key 1. */
public static class TestKey implements Externalizable {
- /** Test key 1 */
+ /** Test key 1. */
public TestKey(String field) {
this.field = field;
}
- /** Test key 1 */
+ /** Test key 1. */
public TestKey() {
}
- /** field */
+ /** field. */
private String field;
/** {@inheritDoc} */
@@ -105,29 +105,28 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- if (nodeCnt.decrementAndGet() < 1) { //will throw exception on backup node only
+ if (readCnt.decrementAndGet() <= 0) { //will throw exception on backup node only
throw new IOException("Class can not be unmarshalled");
}
}
}
/**
- * Test key 2.
- * Unmarshalling always failed.
+ * Test key 2. Unmarshalling always failed.
*/
public static class TestKeyAlwaysFailed extends TestKey {
- /** Test key 2 */
+ /** Test key 2. */
public TestKeyAlwaysFailed(String field) {
super(field);
}
- /** Test key 2 */
+ /** Test key 2. */
public TestKeyAlwaysFailed() {
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- nodeCnt.decrementAndGet();
+ readCnt.decrementAndGet();
throw new IOException("Class can not be unmarshalled"); //will throw exception on primary node
}
@@ -138,20 +137,23 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
*/
public void testResponseMessageOnUnmarshallingFailed() {
- nodeCnt.set(1);
+ //Checking failed unmarshalling on primary node.
+ readCnt.set(1);
try {
- jcache(0).put(new TestKeyAlwaysFailed("1"), "");
+ jcache(0).put(new TestKeyAlwaysFailed("1"), ""); //put will fail at primary node.
+
assert false : "p2p marshalling failed, but error response was not sent";
}
catch (CacheException e) {
assert X.hasCause(e, IOException.class);
}
- assert nodeCnt.get() == 0;//put request should not go to backup node in case failed at primary.
+ assert readCnt.get() == 0; //put request should not be handled by backup node in case failed at primary.
try {
assert jcache(0).get(new TestKeyAlwaysFailed("1")) == null;
+
assert false : "p2p marshalling failed, but error response was not sent";
}
catch (CacheException e) {
@@ -160,20 +162,22 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
assert grid(0).cachex().entrySet().size() == 0;
- nodeCnt.set(2); //put request will be unmarshalled twice (at primary and at backup node).
+ //Checking failed unmarshalling on backup node.
+ readCnt.set(2); //put request will be unmarshalled twice (at primary and at backup node).
try {
- jcache(0).put(new TestKey("1"), "");//put will fail at backup node.
+ jcache(0).put(new TestKey("1"), ""); //put will fail at backup node only.
+
assert false : "p2p marshalling failed, but error response was not sent";
}
catch (CacheException e) {
assert X.hasCause(e, IOException.class);
}
- assert nodeCnt.get() == 0;//put request should go to primary and backup node.
+ assert readCnt.get() == 0; //put request should be handled by primary and backup node.
- // Need to have to exception while unmarshalling getKeyResponse.
- nodeCnt.set(3); //get response will me unmarshalled twice (request at primary node and response at client).
+ // Need to have no exception while unmarshalling getKeyResponse.
+ readCnt.set(3); //get response will me unmarshalled twice (request at primary node and response at client).
assert jcache(0).get(new TestKey("1")) == null;