You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/06 14:07:12 UTC
[13/36] incubator-ignite git commit: ignite-646
ignite-646
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a8a9d108
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a8a9d108
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a8a9d108
Branch: refs/heads/ignite-695
Commit: a8a9d1089d3bf3d6a4adb56a71aede309fa42977
Parents: 163be30
Author: avinogradov <av...@gridgain.com>
Authored: Mon Apr 27 17:41:10 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Mon Apr 27 17:41:10 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 72 +++++++++++-
.../atomic/GridNearAtomicUpdateResponse.java | 2 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 6 +
.../dht/preloader/GridDhtForceKeysResponse.java | 19 ++++
.../IgniteCacheP2pUnmarshallingErrorTest.java | 41 ++++---
.../IgniteCacheP2pUnmarshallingErrorTxTest.java | 111 -------------------
...gniteCacheP2pUnmarshallingNearErrorTest.java | 56 ++++++++++
...CacheP2pUnmarshallingRebalanceErrorTest.java | 69 ++++++++++++
.../IgniteCacheP2pUnmarshallingTxErrorTest.java | 109 ++++++++++++++++++
...niteCacheP2pUnmarshallingQueryErrorTest.java | 58 ++++++++++
10 files changed, 410 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c9af788..14d6f7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
@@ -301,7 +302,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
}
- private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheSharedContext cctx, GridIoPolicy plc) {
+ private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheSharedContext cctx,
+ GridIoPolicy plc) {
try {
cctx.io().send(nodeId, res, plc);
}
@@ -311,10 +313,24 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
}
- private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException{
+ private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException {
GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
switch (msg.directType()) {
+ case 14: {
+ GridCacheEvictionRequest req = (GridCacheEvictionRequest)msg;
+
+ GridCacheEvictionResponse res = new GridCacheEvictionResponse(
+ ctx.cacheId(),
+ req.futureId(),
+ req.classError() != null
+ );
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ }
+
+ break;
+
case 30: {
GridDhtLockRequest req = (GridDhtLockRequest)msg;
@@ -330,7 +346,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
- case 34:{
+ case 34: {
GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
@@ -367,13 +383,37 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
nodeId,
req.futureVersion());
- res.onError(req.classError());
+ res.error(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ }
+
+ break;
+
+ case 42: {
+ GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
+
+ GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
+ ctx.cacheId(),
+ req.futureId(),
+ req.miniId()
+ );
+
+ res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
}
break;
+ case 45: {
+ GridDhtPartitionSupplyMessage req = (GridDhtPartitionSupplyMessage)msg;
+
+ U.error(log, "Supply message cannot be unmarshalled.", req.classError());
+ }
+
+ break;
+
case 49: {
GridNearGetRequest req = (GridNearGetRequest)msg;
@@ -390,6 +430,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case 50: {
+ GridNearGetResponse res = (GridNearGetResponse)msg;
+
+ GridPartitionedGetFuture fut = (GridPartitionedGetFuture)ctx.mvcc().future(
+ res.version(), res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+ return;
+ }
+
+ res.error(res.classError());
+
+ fut.onResult(nodeId, res);
+ }
+
+ break;
+
case 51: {
GridNearLockRequest req = (GridNearLockRequest)msg;
@@ -878,7 +938,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
catch (Error e) {
if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class,
UnsupportedClassVersionError.class))
- cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e));
+ cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e));
else
throw e;
}
@@ -907,7 +967,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/** {@inheritDoc} */
- @SuppressWarnings( {"CatchGenericClass", "unchecked"})
+ @SuppressWarnings({"CatchGenericClass", "unchecked"})
@Override public void onMessage(final UUID nodeId, Object msg) {
if (log.isDebugEnabled())
log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 773b847..330e43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -140,7 +140,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* Sets update error.
* @param err
*/
- public void onError(IgniteCheckedException err){
+ public void error(IgniteCheckedException err){
this.err = err;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 61aaa14..78966d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -451,6 +451,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
* @param res Result callback.
*/
void onResult(GridDhtForceKeysResponse res) {
+ if (res.error() != null) {
+ onDone(res.error());
+
+ return;
+ }
+
Collection<KeyCacheObject> missedKeys = res.missedKeys();
boolean remapMissed = false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 8919185..1d49b34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -42,6 +42,10 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
/** Mini-future ID. */
private IgniteUuid miniId;
+ /** Error. */
+ @GridDirectTransient
+ private volatile IgniteCheckedException err;
+
/** Missed (not found) keys. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -73,6 +77,21 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
this.miniId = miniId;
}
+ /**
+ * Sets error.
+ * @param err
+ */
+ public void error(IgniteCheckedException err){
+ this.err = err;
+ }
+
+ /**
+ * @return Error, if any.
+ */
+ public IgniteCheckedException error() {
+ return err;
+ }
+
/** {@inheritDoc} */
@Override public boolean allowForStartup() {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index 277f10d..b358b15 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.annotations.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -82,6 +83,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
}
/** Field. */
+ @QuerySqlField(index = true)
private String field;
/** {@inheritDoc} */
@@ -110,7 +112,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
field = (String)in.readObject();
- if (readCnt.decrementAndGet() <= 0) { //will throw exception on backup node only
+ if (readCnt.decrementAndGet() <= 0) {
throw new IOException("Class can not be unmarshalled");
}
}
@@ -119,9 +121,9 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
/**
* Sends put atomically and handles fail.
*/
- protected void failAtomicPut() {
+ protected void failAtomicPut(int k) {
try {
- jcache(0).put(new TestKey("1"), "");
+ jcache(0).put(new TestKey(String.valueOf(k)), "");
assert false : "p2p marshalling failed, but error response was not sent";
}
@@ -135,9 +137,9 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
/**
* Sends get atomically and handles fail.
*/
- protected void failAtomicGet() {
+ protected void failAtomicGet(int k) {
try {
- jcache(0).get(new TestKey("1"));
+ jcache(0).get(new TestKey(String.valueOf(k)));
assert false : "p2p marshalling failed, but error response was not sent";
}
@@ -149,30 +151,39 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
/**
* Tests that correct response will be sent to client node in case of unmarshalling failed.
*/
- public void testResponseMessageOnUnmarshallingFailed() {
+ public void testResponseMessageOnUnmarshallingFailed() throws Exception {
//GridNearAtomicUpdateRequest unmarshalling failed test
readCnt.set(1);
- failAtomicPut();
-
- //GridNearGetRequest unmarshalling failed test
- readCnt.set(1);
-
- failAtomicGet();
+ failAtomicPut(++key);
//Check that cache is empty.
readCnt.set(100);
- assert jcache(0).get(new TestKey("1")) == null;
+ assert jcache(0).get(new TestKey(String.valueOf(key))) == null;
//GridDhtAtomicUpdateRequest unmarshalling failed test
readCnt.set(2);
- failAtomicPut();
+ failAtomicPut(++key);
//Check that cache is not empty.
readCnt.set(100);
- assert jcache(0).get(new TestKey("1")) != null;
+ assert jcache(0).get(new TestKey(String.valueOf(key))) != null;
+
+ //GridNearGetRequest unmarshalling failed test
+ readCnt.set(1);
+
+ failAtomicGet(++key);
+
+ //GridNearGetResponse unmarshalling failed test
+ readCnt.set(100);
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ readCnt.set(2);
+
+ failAtomicGet(key);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java
deleted file mode 100644
index 06176aa..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.transactions.*;
-
-import javax.cache.*;
-import java.io.*;
-
-/**
- * Check behavior on exception while unmarshalling key.
- */
-public class IgniteCacheP2pUnmarshallingErrorTxTest extends IgniteCacheP2pUnmarshallingErrorTest {
-
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return CacheAtomicityMode.TRANSACTIONAL;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- if (!gridName.endsWith("0"))
- cfg.getCacheConfiguration()[0].setRebalanceDelay(-1); //allows to check GridDhtLockRequest fail.
-
- return cfg;
- }
-
- /**
- * Sends put with optimistic lock and handles fail.
- */
- protected void failOptimistic() {
- try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
-
- jcache(0).put(new TestKey(String.valueOf(++key)), "");
-
- tx.commit();
-
- assert false : "p2p marshalling failed, but error response was not sent";
- }
- catch (IgniteException e) {
- assert X.hasCause(e, IOException.class);
- }
-
- assert readCnt.get() == 0; //ensure we have read count as expected.
- }
-
- /**
- * Sends put with pessimistic lock and handles fail.
- */
- protected void failPessimictic() {
- try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
-
- jcache(0).put(new TestKey(String.valueOf(++key)), "");
-
- assert false : "p2p marshalling failed, but error response was not sent";
- }
- catch (CacheException e) {
- assert X.hasCause(e, IOException.class);
- }
-
- assert readCnt.get() == 0; //ensure we have read count as expected.
- }
-
- /**
- * Tests that correct response will be sent to client node in case of unmarshalling failed.
- */
- public void testResponseMessageOnUnmarshallingFailed() {
- //GridNearTxPrepareRequest unmarshalling failed test
- readCnt.set(2);
-
- failOptimistic();
-
- //GridDhtTxPrepareRequest unmarshalling failed test
- readCnt.set(3);
-
- failOptimistic();
-
- //GridNearLockRequest unmarshalling failed test
- readCnt.set(2);
-
- failPessimictic();
-
- //GridDhtLockRequest unmarshalling failed test
- readCnt.set(3);
-
- try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
- jcache(0).put(new TestKey(String.valueOf(++key)), ""); //No failure at client side.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
new file mode 100644
index 0000000..0b9226f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * Check behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getCacheConfiguration()[0].setEvictMaxOverflowRatio(0);
+ cfg.getCacheConfiguration()[0].setEvictSynchronized(true);
+ cfg.getCacheConfiguration()[0].setEvictSynchronizedKeyBufferSize(1);
+ cfg.getCacheConfiguration()[0].setEvictionPolicy(new FifoEvictionPolicy(1));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
+ //GridCacheEvictionRequest unmarshalling failed test
+ readCnt.set(5);
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ //Eviction request unmarshalling failed but ioManager does not hangs up.
+
+ Thread.sleep(1000); //todo: wait for eviction complete
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
new file mode 100644
index 0000000..f247a00
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.affinity.*;
+
+/**
+ * Check behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testResponseMessageOnUnmarshallingFailed() throws Exception {
+ //GridDhtPartitionSupplyMessage unmarshalling failed test
+ readCnt.set(100);
+
+ for (int i = 0; i <= 20; i++) {
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+ }
+
+ readCnt.set(1);
+
+ startGrid(3);
+
+ //GridDhtPartitionSupplyMessage unmarshalling failed but ioManager does not hangs up.
+
+ Thread.sleep(1000);
+
+ //GridDhtForceKeysRequest unmarshalling failed test
+ stopGrid(3);
+
+ readCnt.set(Integer.MAX_VALUE);
+
+ for (int i = 0; i <= 1000; i++) {
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+ }
+
+ startGrid(3);
+
+ Affinity<Object> aff = affinity(grid(3).cache(null));
+
+ while (!aff.isPrimary(grid(3).localNode(), new TestKey(String.valueOf(key)))) {
+ --key;
+ }
+
+ readCnt.set(1);
+
+ jcache(3).get(new TestKey(String.valueOf(key)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java
new file mode 100644
index 0000000..ca48507
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.io.*;
+
+/**
+ * Check behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingTxErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (!gridName.endsWith("0"))
+ cfg.getCacheConfiguration()[0].setRebalanceDelay(-1); //allows to check GridDhtLockRequest fail.
+
+ return cfg;
+ }
+
+ /**
+ * Sends put with optimistic lock and handles fail.
+ */
+ protected void failOptimistic() {
+ try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ tx.commit();
+
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (IgniteException e) {
+ assert X.hasCause(e, IOException.class);
+ }
+
+ assert readCnt.get() == 0; //ensure we have read count as expected.
+ }
+
+ /**
+ * Sends put with pessimistic lock and handles fail.
+ */
+ protected void failPessimictic() {
+ try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), "");
+
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (CacheException e) {
+ assert X.hasCause(e, IOException.class);
+ }
+
+ assert readCnt.get() == 0; //ensure we have read count as expected.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testResponseMessageOnUnmarshallingFailed() {
+ //GridNearTxPrepareRequest unmarshalling failed test
+ readCnt.set(2);
+
+ failOptimistic();
+
+ //GridDhtTxPrepareRequest unmarshalling failed test
+ readCnt.set(3);
+
+ failOptimistic();
+
+ //GridNearLockRequest unmarshalling failed test
+ readCnt.set(2);
+
+ failPessimictic();
+
+ //GridDhtLockRequest unmarshalling failed test
+ readCnt.set(3);
+
+ try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ jcache(0).put(new TestKey(String.valueOf(++key)), ""); //No failure at client side.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a9d108/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
new file mode 100644
index 0000000..265490c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.*;
+
+/**
+ * Check behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingQueryErrorTest extends IgniteCacheP2pUnmarshallingErrorTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getCacheConfiguration()[0].setIndexedTypes(TestKey.class, String.class);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testResponseMessageOnUnmarshallingFailed() {
+ readCnt.set(100);
+
+ TestKey val = new TestKey(String.valueOf(++key));
+
+ jcache(0).put(val, "");
+
+ //GridCacheQueryRequest unmarshalling failed test
+ readCnt.set(1);
+
+ try {
+ jcache(0).query(new SqlQuery<TestKey, String>(String.class, "field like '" + key + "'")).getAll();
+
+ assert false : "p2p marshalling failed, but error response was not sent";
+ }
+ catch (CacheException e) {
+ // No-op
+ }
+ }
+}