You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/19 06:01:21 UTC
[1/4] ignite git commit: IGNITE-3694
IgfsLocalSecondaryFileSystemDualAsyncSelfTest.testAppendConsistencyMultithreaded
hangs
Repository: ignite
Updated Branches:
refs/heads/ignite-3547-1 05a84654d -> e97a162be
IGNITE-3694 IgfsLocalSecondaryFileSystemDualAsyncSelfTest.testAppendConsistencyMultithreaded hangs
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97d1a6f6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97d1a6f6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97d1a6f6
Branch: refs/heads/ignite-3547-1
Commit: 97d1a6f6f1d30507ad93122eb9ea63285cdffde7
Parents: 476081b
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Aug 18 15:06:04 2016 +0300
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Thu Aug 18 17:59:25 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/igfs/IgfsOutputStreamImpl.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/97d1a6f6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index f6b1104..bbff93b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -315,6 +315,11 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
", fileInfo=" + fileInfo + ']', e);
}
+ // Finish batch before file unlocking to support the assertion that unlocked file batch,
+ // if any, must be in finishing state (e.g. append see more IgfsImpl.newBatch)
+ if (batch != null)
+ batch.finish();
+
// Unlock the file after data is flushed.
try {
if (flushSuccess && space > 0)
@@ -332,8 +337,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
// Finally, await secondary file system flush.
if (batch != null) {
- batch.finish();
-
if (mode == DUAL_SYNC) {
try {
batch.await();
[2/4] ignite git commit: IGNITE-2559 Fixed Transaction hangs if entry
processor is not serializable. This closes #951.
Posted by sb...@apache.org.
IGNITE-2559 Fixed Transaction hangs if entry processor is not serializable. This closes #951.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8aa534a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8aa534a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8aa534a6
Branch: refs/heads/ignite-3547-1
Commit: 8aa534a6124c066801e6627f36179934653fe59f
Parents: 97d1a6f
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Aug 18 18:21:22 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Aug 18 18:21:22 2016 +0300
----------------------------------------------------------------------
.../GridNearPessimisticTxPrepareFuture.java | 2 +
.../near/GridNearTxFinishFuture.java | 28 +-
.../cache/distributed/near/GridNearTxLocal.java | 10 +-
.../cache/transactions/IgniteTxHandler.java | 1 -
.../CacheEntryProcessorNonSerializableTest.java | 410 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
6 files changed, 437 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 5d347d7..ef2edc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -291,6 +291,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
fut.onError(e);
+
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 39f3ff3..adde63c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -97,7 +97,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
@GridToStringInclude
private GridNearTxLocal tx;
- /** Commit flag. */
+ /** Commit flag. This flag used only for one-phase commit transaction. */
private boolean commit;
/** Node mappings. */
@@ -313,6 +313,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
+ if (commit && tx.commitError() != null)
+ err = tx.commitError();
+
if (initialized() || err != null) {
if (tx.needCheckBackup()) {
assert tx.onePhaseCommit();
@@ -386,9 +389,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* Initializes future.
+ *
+ * @param commit Commit flag.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- void finish() {
+ void finish(boolean commit) {
if (tx.onNeedCheckBackup()) {
assert tx.onePhaseCommit();
@@ -403,15 +408,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
try {
if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) {
- if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit() && mappings != null)) {
+ if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) {
if (mappings.single()) {
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null)
- finish(mapping);
+ finish(mapping, commit);
}
else
- finish(mappings.mappings());
+ finish(mappings.mappings(), commit);
}
markInitialized();
@@ -543,13 +548,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
+ * @param commit Commit flag.
* @return {@code True} if need to send finish request for one phase commit transaction.
*/
- private boolean needFinishOnePhase() {
+ private boolean needFinishOnePhase(boolean commit) {
if (tx.mappings().empty())
return false;
- boolean finish = tx.txState().hasNearCache(cctx);
+ boolean finish = tx.txState().hasNearCache(cctx) || !commit;
if (finish) {
GridDistributedTxMapping mapping = tx.mappings().singleMapping();
@@ -605,17 +611,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @param mappings Mappings.
+ * @param commit Commit flag.
*/
- private void finish(Iterable<GridDistributedTxMapping> mappings) {
+ private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit) {
// Create mini futures.
for (GridDistributedTxMapping m : mappings)
- finish(m);
+ finish(m, commit);
}
/**
* @param m Mapping.
+ * @param commit Commit flag.
*/
- private void finish(GridDistributedTxMapping m) {
+ private void finish(GridDistributedTxMapping m, boolean commit) {
ClusterNode n = m.node();
assert !m.empty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 62cf74b..28c60d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -857,19 +857,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
// Make sure that here are no exceptions.
prepareFut.get();
- fut0.finish();
+ fut0.finish(true);
}
catch (Error | RuntimeException e) {
COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
- fut0.onDone(e);
+ fut0.finish(false);
throw e;
}
catch (IgniteCheckedException e) {
COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
- fut0.onDone(e);
+ fut0.finish(false);
}
}
});
@@ -917,7 +917,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
}
- fut.finish();
+ fut.finish(false);
}
else {
prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -933,7 +933,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
GridNearTxFinishFuture fut0 = rollbackFut;
- fut0.finish();
+ fut0.finish(false);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ba30e10..7c3c206 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -639,7 +639,6 @@ public class IgniteTxHandler {
", node=" + nodeId + ']');
}
-
fut.onResult(nodeId, res);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java
new file mode 100644
index 0000000..79aa34f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.NotSerializableException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheEntryProcessorNonSerializableTest extends GridCommonAbstractTest {
+ /** */
+ private static final int EXPECTED_VALUE = 42;
+
+ /** */
+ private static final int WRONG_VALUE = -1;
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 3;
+
+ /** */
+ public static final int ITERATION_CNT = 1;
+
+ /** */
+ public static final int KEYS = 10;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ cfg.setClientMode(client);
+ cfg.setMarshaller(new OptimizedMarshaller());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(getServerNodeCount());
+
+ client = true;
+
+ startGrid(getServerNodeCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @return Server nodes.
+ */
+ private int getServerNodeCount() {
+ return NODES;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticOnePhaseCommit() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticOnePhaseCommitWithNearCache() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1)
+ .setNearConfiguration(new NearCacheConfiguration());
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticOnePhaseCommitFullSync() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticOnePhaseCommitFullSyncWithNearCache() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1)
+ .setNearConfiguration(new NearCacheConfiguration());
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimistic() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticWithNearCache() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2)
+ .setNearConfiguration(new NearCacheConfiguration());
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticFullSync() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticOnePhaseCommit() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticOnePhaseCommitFullSync() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticOnePhaseCommitFullSyncWithNearCache() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1)
+ .setNearConfiguration(new NearCacheConfiguration());
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimistic() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticFullSync() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticFullSyncWithNearCache() throws Exception {
+ CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+ doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void doTestInvokeTest(CacheConfiguration ccfg, TransactionConcurrency txConcurrency,
+ TransactionIsolation txIsolation) throws Exception {
+ IgniteEx cln = grid(getServerNodeCount());
+
+ grid(0).createCache(ccfg);
+
+ IgniteCache clnCache;
+
+ if (ccfg.getNearConfiguration() != null)
+ clnCache = cln.createNearCache(ccfg.getName(), ccfg.getNearConfiguration());
+ else
+ clnCache = cln.cache(ccfg.getName());
+
+ putKeys(clnCache, EXPECTED_VALUE);
+
+ try {
+ // Explicit tx.
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ try (final Transaction tx = cln.transactions().txStart(txConcurrency, txIsolation)) {
+ putKeys(clnCache, WRONG_VALUE);
+
+ clnCache.invoke(KEYS, new NonSerialazibleEntryProcessor());
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.commit();
+
+ return null;
+ }
+ }, NotSerializableException.class);
+ }
+
+ checkKeys(clnCache, EXPECTED_VALUE);
+ }
+
+ // From affinity node.
+ Ignite grid = grid(ThreadLocalRandom.current().nextInt(NODES));
+
+ final IgniteCache cache = grid.cache(ccfg.getName());
+
+ // Explicit tx.
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ try (final Transaction tx = grid.transactions().txStart(txConcurrency, txIsolation)) {
+ putKeys(cache, WRONG_VALUE);
+
+ cache.invoke(KEYS, new NonSerialazibleEntryProcessor());
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.commit();
+
+ return null;
+ }
+ }, NotSerializableException.class);
+ }
+
+ checkKeys(cache, EXPECTED_VALUE);
+ }
+
+ final IgniteCache clnCache0 = clnCache;
+
+ // Implicit tx.
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clnCache0.invoke(KEYS, new NonSerialazibleEntryProcessor());
+
+ return null;
+ }
+ }, NotSerializableException.class);
+ }
+
+ checkKeys(clnCache, EXPECTED_VALUE);
+ }
+ finally {
+ grid(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param val Value.
+ */
+ private void putKeys(IgniteCache cache, int val) {
+ cache.put(KEYS, val);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param expVal Expected value.
+ */
+ private void checkKeys(IgniteCache cache, int expVal) {
+ assertEquals(expVal, cache.get(KEYS));
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(CacheWriteSynchronizationMode wrMode, int backup) {
+ return new CacheConfiguration("test-cache-" + wrMode + "-" + backup)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setBackups(backup);
+ }
+
+ /**
+ *
+ */
+ private static class NonSerialazibleEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments)
+ throws EntryProcessorException {
+ entry.setValue(42);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 8c3f4de..84e1502 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGet
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExceptionSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest;
@@ -186,6 +187,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheAtomicLocalInvokeTest.class);
suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
suite.addTestSuite(IgniteCacheTxInvokeTest.class);
+ suite.addTestSuite(CacheEntryProcessorNonSerializableTest.class);
suite.addTestSuite(IgniteCacheEntryProcessorCallTest.class);
GridTestUtils.addTestIfNeeded(suite, CacheEntryProcessorCopySelfTest.class, ignoredTests);
suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
[3/4] ignite git commit: Merge remote-tracking branch
'remotes/community/ignite-1.6.6' into ignite-3547-1
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/community/ignite-1.6.6' into ignite-3547-1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09e7fb7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09e7fb7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09e7fb7a
Branch: refs/heads/ignite-3547-1
Commit: 09e7fb7aeea2b47c7e542acb46822b0305a69f74
Parents: 05a8465 8aa534a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 19 08:59:18 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 19 08:59:18 2016 +0300
----------------------------------------------------------------------
.../GridNearPessimisticTxPrepareFuture.java | 2 +
.../near/GridNearTxFinishFuture.java | 28 +-
.../cache/distributed/near/GridNearTxLocal.java | 10 +-
.../cache/transactions/IgniteTxHandler.java | 1 -
.../processors/igfs/IgfsOutputStreamImpl.java | 7 +-
.../CacheEntryProcessorNonSerializableTest.java | 410 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
7 files changed, 442 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/09e7fb7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
[4/4] ignite git commit: ignite-3547
Posted by sb...@apache.org.
ignite-3547
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e97a162b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e97a162b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e97a162b
Branch: refs/heads/ignite-3547-1
Commit: e97a162be440830c844f108741c7d46625331a32
Parents: 09e7fb7
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 19 09:01:07 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 19 09:01:07 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheSerializableTransactionsTest.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e97a162b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 4baef66..3d4f850 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -3311,6 +3311,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void getRemoveTx(boolean nearCache, boolean store) throws Exception {
+ long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000;
+
final Ignite ignite0 = ignite(0);
CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, store, false);
@@ -3330,6 +3332,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
for (int i = 0; i < 100; i++) {
+ if (U.currentTimeMillis() > stopTime)
+ break;
+
final AtomicInteger cntr = new AtomicInteger();
final Integer key = i;