You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/23 10:41:03 UTC
[01/12] ignite git commit: IGNITE-3907 Fixed "Incorrect
initialization CQ when node filter configured for cache"
Repository: ignite
Updated Branches:
refs/heads/ignite-1.6.8-hadoop ee5a818ba -> 4d08b5cb5
IGNITE-3907 Fixed "Incorrect initialization CQ when node filter configured for cache"
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebf354c5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebf354c5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebf354c5
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: ebf354c568d0802b7eed1cc6b9d251941dbce014
Parents: 2474e2b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Sep 16 14:32:13 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Sep 16 14:32:13 2016 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 5 -
.../internal/GridMessageListenHandler.java | 5 -
.../continuous/CacheContinuousQueryHandler.java | 5 -
.../continuous/GridContinuousHandler.java | 8 -
.../continuous/GridContinuousProcessor.java | 33 ++--
...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++++++++++++
6 files changed, 170 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index b4b1e58..ed6998d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -262,11 +262,6 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public void unregister(UUID routineId, GridKernalContext ctx) {
assert routineId != null;
assert ctx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 2b8041d..1bca85c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -139,11 +139,6 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public void unregister(UUID routineId, GridKernalContext ctx) {
ctx.io().removeUserMessageListener(topic, pred);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7b3b47b..a5752ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -564,11 +564,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/** {@inheritDoc} */
- @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public void unregister(UUID routineId, GridKernalContext ctx) {
assert routineId != null;
assert ctx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index c90746d..f14b450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -57,14 +57,6 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
public RegisterStatus register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException;
/**
- * Callback called after listener is registered and acknowledgement is sent.
- *
- * @param routineId Routine ID.
- * @param ctx Kernal context.
- */
- public void onListenerRegistered(UUID routineId, GridKernalContext ctx);
-
- /**
* Unregisters listener.
*
* @param routineId Routine ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 5f61051..ad7ad4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -478,11 +478,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Register handler only if local node passes projection predicate.
if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
- !locInfos.containsKey(item.routineId)) {
- if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
- item.autoUnsubscribe, false))
- item.hnd.onListenerRegistered(item.routineId, ctx);
- }
+ !locInfos.containsKey(item.routineId))
+ registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
+ item.autoUnsubscribe, false);
if (!item.autoUnsubscribe)
// Register routine locally.
@@ -509,14 +507,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.resource().injectGeneric(info.prjPred);
if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
- if (registerHandler(clientNodeId,
+ registerHandler(clientNodeId,
routineId,
info.hnd,
info.bufSize,
info.interval,
info.autoUnsubscribe,
- false))
- info.hnd.onListenerRegistered(routineId, ctx);
+ false);
}
}
catch (IgniteCheckedException err) {
@@ -555,9 +552,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx);
assert status != GridContinuousHandler.RegisterStatus.DELAYED;
-
- if (status == GridContinuousHandler.RegisterStatus.REGISTERED)
- hnd.onListenerRegistered(routineId, this.ctx);
}
}
}
@@ -649,8 +643,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
- hnd.onListenerRegistered(routineId, ctx);
-
return new GridFinishedFuture<>(routineId);
}
catch (IgniteCheckedException e) {
@@ -700,9 +692,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
startFuts.put(routineId, fut);
try {
- if (locIncluded
- && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
- hnd.onListenerRegistered(routineId, ctx);
+ if (locIncluded || hnd.isQuery())
+ registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,
reqData.handler().keepBinary()));
@@ -1020,8 +1011,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
data.autoUnsubscribe()));
}
- boolean registered = false;
-
if (err == null) {
try {
IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate();
@@ -1030,10 +1019,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.resource().injectGeneric(prjPred);
if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
- !locInfos.containsKey(routineId)) {
- registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
+ !locInfos.containsKey(routineId))
+ registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
data.autoUnsubscribe(), false);
- }
if (!data.autoUnsubscribe())
// Register routine locally.
@@ -1061,9 +1049,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (err != null)
req.addError(ctx.localNodeId(), err);
-
- if (registered)
- hnd0.onListenerRegistered(routineId, ctx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
index 7000446..cf0c0d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
@@ -17,9 +17,17 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import javax.cache.configuration.Factory;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
@@ -33,9 +41,12 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -45,8 +56,10 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/** */
@SuppressWarnings("unchecked")
@@ -57,13 +70,21 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
/** */
private static final int SERVER_GRIDS_COUNT = 6;
+ /** */
+ public static final int KEYS = 2_000;
+
/** Cache entry operations' counts. */
private static final ConcurrentMap<String, AtomicInteger> opCounts = new ConcurrentHashMap8<>();
+ /** Client. */
+ private static boolean client = false;
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
+ client = false;
+
super.afterTest();
}
@@ -122,6 +143,108 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testWithNodeFilter() throws Exception {
+ List<QueryCursor> qryCursors = new ArrayList<>();
+
+ final int nodesCnt = 3;
+
+ startGridsMultiThreaded(nodesCnt);
+
+ awaitPartitionMapExchange();
+
+ CacheConfiguration ccfg = cacheConfiguration(new NodeFilterByRegexp(".*(0|1)$"));
+
+ grid(0).createCache(ccfg);
+
+ final AtomicInteger cntr = new AtomicInteger();
+
+ final ConcurrentMap<ClusterNode, Set<Integer>> maps = new ConcurrentHashMap<>();
+
+ final AtomicBoolean doubleNtfFail = new AtomicBoolean(false);
+
+ CacheEntryUpdatedListener<Integer, Integer> lsnr = new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ cntr.incrementAndGet();
+
+ ClusterNode node = ((Ignite)e.getSource().unwrap(Ignite.class)).cluster().localNode();
+
+ Set<Integer> set = maps.get(node);
+
+ if (set == null) {
+ set = new ConcurrentSkipListSet<>();
+
+ Set<Integer> oldVal = maps.putIfAbsent(node, set);
+
+ set = oldVal != null ? oldVal : set;
+ }
+
+ if (!set.add(e.getValue()))
+ doubleNtfFail.set(false);
+ }
+ }
+ };
+
+ for (int i = 0; i < nodesCnt; i++) {
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ Ignite ignite = grid(i);
+
+ log.info("Try to start CQ on node: " + ignite.cluster().localNode().id());
+
+ qryCursors.add(ignite.cache(ccfg.getName()).query(qry));
+
+ log.info("CQ started on node: " + ignite.cluster().localNode().id());
+ }
+
+ client = true;
+
+ startGrid(nodesCnt);
+
+ awaitPartitionMapExchange();
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ qryCursors.add(grid(nodesCnt).cache(ccfg.getName()).query(qry));
+
+ for (int i = 0; i <= nodesCnt; i++) {
+ for (int key = 0; key < KEYS; key++) {
+ int val = (i * KEYS) + key;
+
+ grid(i).cache(ccfg.getName()).put(val, val);
+ }
+ }
+
+ assertTrue(GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return cntr.get() >= 2 * (nodesCnt + 1) * KEYS;
+ }
+ }, 5000L));
+
+ assertFalse("Got duplicate", doubleNtfFail.get());
+
+ for (int i = 0; i < (nodesCnt + 1) * KEYS; i++) {
+ for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet())
+ assertTrue("Lost event on node: " + e.getKey().id() + ", event: " + i, e.getValue().remove(i));
+ }
+
+ for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet())
+ assertTrue("Unexpected event on node: " + e.getKey(), e.getValue().isEmpty());
+
+ assertEquals("Not expected count of CQ", nodesCnt + 1, qryCursors.size());
+
+ for (QueryCursor cur : qryCursors)
+ cur.close();
+ }
+
/** */
private Ignite startGrid(final int idx, boolean isClientMode) throws Exception {
String gridName = getTestGridName(idx);
@@ -179,6 +302,28 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
return node;
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @param filter Node filter.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(NodeFilterByRegexp filter) {
+ return new CacheConfiguration("test-cache-cq")
+ .setBackups(1)
+ .setNodeFilter(filter)
+ .setAtomicityMode(ATOMIC)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setCacheMode(PARTITIONED);
+ }
+
/** */
private final static class ListenerConfiguration extends MutableCacheEntryListenerConfiguration {
/** Operation. */
@@ -275,4 +420,20 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
return ((Integer)clusterNode.attributes().get("idx") % 2) == idx % 2;
}
}
+
+ /** */
+ private final static class NodeFilterByRegexp implements IgnitePredicate<ClusterNode> {
+ /** */
+ private final Pattern pattern;
+
+ /** */
+ private NodeFilterByRegexp(String regExp) {
+ this.pattern = Pattern.compile(regExp);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ return pattern.matcher(clusterNode.id().toString()).matches();
+ }
+ }
}
[06/12] ignite git commit: IGNITE-1525 Return value for cache
operation can be lost with onePhaseCommit
Posted by vo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e67e60f..a5b2202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
@@ -175,6 +177,12 @@ public class IgniteTxHandler {
}
});
+ ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage msg) {
+ processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
+ }
+ });
+
ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
@@ -882,7 +890,7 @@ public class IgniteTxHandler {
* @param nodeId Sender node ID.
* @param req Request.
*/
- protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest req) {
+ protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
@@ -918,14 +926,15 @@ public class IgniteTxHandler {
if (dhtTx != null) {
dhtTx.onePhaseCommit(true);
+ dhtTx.needReturnValue(req.needReturnValue());
- finish(nodeId, dhtTx, req);
+ finish(dhtTx, req);
}
if (nearTx != null) {
nearTx.onePhaseCommit(true);
- finish(nodeId, nearTx, req);
+ finish(nearTx, req);
}
}
}
@@ -950,38 +959,60 @@ public class IgniteTxHandler {
req.deployInfo() != null);
}
- try {
- // Reply back to sender.
- ctx.io().send(nodeId, res, req.policy());
+ if (req.onePhaseCommit()) {
+ IgniteInternalFuture completeFut;
- if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId + ']');
- }
- }
- catch (IgniteCheckedException e) {
- if (e instanceof ClusterTopologyCheckedException) {
- if (txPrepareMsgLog.isDebugEnabled()) {
- txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId + ']');
- }
- }
- else {
- U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
- "txId=" + req.nearXidVersion() +
- ", dhtTxId=" + req.version() +
- ", node=" + nodeId +
- ", err=" + e.getMessage() + ']');
+ IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
+ null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+ final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
+ null : nearTx.done() ? null : nearTx.finishFuture();
+
+ if (dhtFin != null && nearFin != null) {
+ GridCompoundFuture fut = new GridCompoundFuture();
+
+ fut.add(dhtFin);
+ fut.add(nearFin);
+
+ fut.markInitialized();
+
+ completeFut = fut;
}
+ else
+ completeFut = dhtFin != null ? dhtFin : nearFin;
- if (nearTx != null)
- nearTx.rollback();
+ if (completeFut != null) {
+ final GridDhtTxPrepareResponse res0 = res;
+ final GridDhtTxRemote dhtTx0 = dhtTx;
+ final GridNearTxRemote nearTx0 = nearTx;
- if (dhtTx != null)
- dhtTx.rollback();
+ completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ sendReply(nodeId, req, res0, dhtTx0, nearTx0);
+ }
+ });
+ }
+ else
+ sendReply(nodeId, req, res, dhtTx, nearTx);
}
+ else
+ sendReply(nodeId, req, res, dhtTx, nearTx);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+ final GridDhtTxOnePhaseCommitAckRequest req) {
+ assert nodeId != null;
+ assert req != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Processing dht tx one phase commit ack request [nodeId=" + nodeId + ", req=" + req + ']');
+
+ for (GridCacheVersion ver : req.versions())
+ ctx.tm().removeTxReturn(ver);
}
/**
@@ -1139,12 +1170,10 @@ public class IgniteTxHandler {
}
/**
- * @param nodeId Node ID.
* @param tx Transaction.
* @param req Request.
*/
protected void finish(
- UUID nodeId,
GridDistributedTxRemoteAdapter tx,
GridDhtTxPrepareRequest req) throws IgniteTxHeuristicCheckedException {
assert tx != null : "No transaction for one-phase commit prepare request: " + req;
@@ -1177,6 +1206,52 @@ public class IgniteTxHandler {
}
/**
+ * @param nodeId Node id.
+ * @param req Request.
+ * @param res Response.
+ * @param dhtTx Dht tx.
+ * @param nearTx Near tx.
+ */
+ protected void sendReply(UUID nodeId,
+ GridDhtTxPrepareRequest req,
+ GridDhtTxPrepareResponse res,
+ GridDhtTxRemote dhtTx,
+ GridNearTxRemote nearTx) {
+ try {
+ // Reply back to sender.
+ ctx.io().send(nodeId, res, req.policy());
+
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException) {
+ if (txPrepareMsgLog.isDebugEnabled()) {
+ txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ else {
+ U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
+ "txId=" + req.nearXidVersion() +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId +
+ ", err=" + e.getMessage() + ']');
+ }
+
+ if (nearTx != null)
+ nearTx.rollback();
+
+ if (dhtTx != null)
+ dhtTx.rollback();
+ }
+ }
+
+ /**
* Sends tx finish response to remote node, if response is requested.
*
* @param nodeId Node id that originated finish request.
@@ -1191,7 +1266,26 @@ public class IgniteTxHandler {
if (req.checkCommitted()) {
res.checkCommitted(true);
- if (!committed) {
+ if (committed) {
+ if (req.needReturnValue()) {
+ try {
+ GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
+
+ if (wrapper != null)
+ res.returnValue(wrapper.fut().get());
+ else
+ assert !ctx.discovery().alive(nodeId) : nodeId;
+ }
+ catch (IgniteCheckedException e) {
+ if (txFinishMsgLog.isDebugEnabled()) {
+ txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId +
+ ", dhtTxId=" + req.version() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ }
+ }
+ else {
ClusterTopologyCheckedException cause =
new ClusterTopologyCheckedException("Primary node left grid.");
@@ -1492,8 +1586,7 @@ public class IgniteTxHandler {
* @param req Request.
*/
protected void processCheckPreparedTxRequest(final UUID nodeId,
- final GridCacheTxRecoveryRequest req)
- {
+ final GridCacheTxRecoveryRequest req) {
if (txRecoveryMsgLog.isDebugEnabled()) {
txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() +
", node=" + nodeId + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 637f322..fe69536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -151,9 +151,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** Commit error. */
protected volatile Throwable commitErr;
- /** Need return value. */
- protected boolean needRetVal;
-
/** Implicit transaction result. */
protected GridCacheReturn implicitRes;
@@ -355,13 +352,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @return Flag indicating whether transaction needs return value.
- */
- public boolean needReturnValue() {
- return needRetVal;
- }
-
- /**
* @return {@code True} if transaction participates in a cache that has an interceptor configured.
*/
public boolean hasInterceptor() {
@@ -369,13 +359,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @param needRetVal Need return value flag.
- */
- public void needReturnValue(boolean needRetVal) {
- this.needRetVal = needRetVal;
- }
-
- /**
* @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
* to remote nodes.
*/
@@ -703,7 +686,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txEntry.cached().unswap(false);
IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
- true);
+ true, null);
GridCacheVersion dhtVer = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index f9357f9..a1580a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +50,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLo
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -87,8 +92,11 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.ConcurrentLinkedHashMap;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
@@ -123,6 +131,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** Tx salvage timeout (default 3s). */
private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
+ /** One phase commit deferred ack request timeout. */
+ public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+ Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500);
+
+ /** One phase commit deferred ack request buffer size. */
+ private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+ Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
+
/** Version in which deadlock detection introduced. */
public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
@@ -160,7 +176,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
/** Committed local transactions. */
- private final ConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap =
+ private final ConcurrentLinkedHashMap<GridCacheVersion, Object> completedVersHashMap =
new ConcurrentLinkedHashMap<>(
Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
0.75f,
@@ -168,6 +184,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
PER_SEGMENT_Q);
+ /** Pending one phase commit ack requests sender. */
+ private GridDeferredAckMessageSender deferredAckMessageSender;
+
/** Transaction finish synchronizer. */
private GridCacheTxFinishSync txFinishSync;
@@ -209,6 +228,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
for (TxDeadlockFuture fut : deadlockDetectFuts.values())
fut.onNodeLeft(nodeId);
+
+ for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof GridCacheReturnCompletableWrapper &&
+ nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
+ removeTxReturn(entry.getKey());
+ }
}
},
EVT_NODE_FAILED, EVT_NODE_LEFT);
@@ -237,6 +264,33 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
txFinishSync = new GridCacheTxFinishSync<>(cctx);
txHnd = new IgniteTxHandler(cctx);
+
+ deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+ @Override public int getTimeout() {
+ return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
+ }
+
+ @Override public int getBufferSize() {
+ return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+ }
+
+ @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+ GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers);
+
+ cctx.kernalContext().gateway().readLock();
+
+ try {
+ cctx.io().send(nodeId, ackReq, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send one phase commit ack to backup node [backup=" +
+ nodeId + ']', e);
+ }
+ finally {
+ cctx.kernalContext().gateway().readUnlock();
+ }
+ }
+ };
}
/** {@inheritDoc} */
@@ -898,9 +952,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*/
public void addCommittedTx(IgniteInternalTx tx) {
addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion());
+ }
- if (!tx.local() && !tx.near() && tx.onePhaseCommit())
- addCommittedTx(tx, tx.nearXidVersion(), null);
+ /**
+ * @param tx Committed transaction.
+ */
+ public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) {
+ addCommittedTxReturn(tx.nearXidVersion(), null, ret);
}
/**
@@ -925,7 +983,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (nearXidVer != null)
xidVer = new CommittedVersion(xidVer, nearXidVer);
- Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
+ Object committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
Boolean b = completedVersSorted.putIfAbsent(xidVer, true);
@@ -933,7 +991,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert b == null;
}
- return committed0 == null || committed0;
+ Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+ return committed0 == null || committed;
+ }
+
+ /**
+ * @param xidVer Completed transaction version.
+ * @param nearXidVer Optional near transaction ID.
+ * @param retVal Invoke result.
+ */
+ private void addCommittedTxReturn(
+ GridCacheVersion xidVer,
+ @Nullable GridCacheVersion nearXidVer,
+ GridCacheReturnCompletableWrapper retVal
+ ) {
+ assert retVal != null;
+
+ if (nearXidVer != null)
+ xidVer = new CommittedVersion(xidVer, nearXidVer);
+
+ Object prev = completedVersHashMap.putIfAbsent(xidVer, retVal);
+
+ assert prev == null || Boolean.FALSE.equals(prev) : prev; // Can be rolled back.
}
/**
@@ -945,7 +1025,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
IgniteInternalTx tx,
GridCacheVersion xidVer
) {
- Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
+ Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
Boolean b = completedVersSorted.putIfAbsent(xidVer, false);
@@ -953,7 +1033,47 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert b == null;
}
- return committed0 == null || !committed0;
+ Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+ return committed0 == null || !committed;
+ }
+
+ /**
+ * @param xidVer xidVer Completed transaction version.
+ * @return Tx result.
+ */
+ public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion xidVer) {
+ Object retVal = completedVersHashMap.get(xidVer);
+
+ // Will gain true in regular case or GridCacheReturn in onePhaseCommit case.
+ if (!Boolean.TRUE.equals(retVal)) {
+ assert !Boolean.FALSE.equals(retVal); // Method should be used only after 'committed' checked.
+
+ GridCacheReturnCompletableWrapper res = (GridCacheReturnCompletableWrapper)retVal;
+
+ removeTxReturn(xidVer);
+
+ return res;
+ }
+ else
+ return null;
+ }
+
+ /**
+ * @param xidVer xidVer Completed transaction version.
+ */
+ public void removeTxReturn(GridCacheVersion xidVer) {
+ Object prev = completedVersHashMap.get(xidVer);
+
+ if (Boolean.FALSE.equals(prev)) // Tx can be rolled back.
+ return;
+
+ assert prev instanceof GridCacheReturnCompletableWrapper:
+ prev + " instead of GridCacheReturnCompletableWrapper";
+
+ boolean res = completedVersHashMap.replace(xidVer, prev, true);
+
+ assert res;
}
/**
@@ -1086,7 +1206,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* so we don't do it here.
*/
- Boolean committed = completedVersHashMap.get(tx.xidVersion());
+ Object committed0 = completedVersHashMap.get(tx.xidVersion());
+
+ Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
// 1. Make sure that committed version has been recorded.
if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
@@ -1672,12 +1794,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
boolean committed = false;
- for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) {
+ for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
if (entry.getKey() instanceof CommittedVersion) {
CommittedVersion comm = (CommittedVersion)entry.getKey();
if (comm.nearVer.equals(xidVer)) {
- committed = entry.getValue();
+ committed = !entry.getValue().equals(Boolean.FALSE);
break;
}
@@ -1809,8 +1931,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Not all transactions were found. Need to scan committed versions to check
// if transaction was already committed.
- for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) {
- if (!e.getValue())
+ for (Map.Entry<GridCacheVersion, Object> e : completedVersHashMap.entrySet()) {
+ if (e.getValue().equals(Boolean.FALSE))
continue;
GridCacheVersion ver = e.getKey();
@@ -2137,6 +2259,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param nodeId Node ID to send message to.
+ * @param ver Version to ack.
+ */
+ public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
+ deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+ }
+
+ /**
* @return Collection of active transaction deadlock detection futures.
*/
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index e611723..c3d194b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +31,6 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
-
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
@@ -43,8 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -59,6 +61,7 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -70,7 +73,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final long DURATION = 60_000;
+ protected static final long DURATION = 60_000;
/** */
protected static final int GRID_CNT = 4;
@@ -78,8 +81,8 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
/**
* @return Keys count for the test.
*/
- private int keysCount() {
- return 10_000;
+ protected int keysCount() {
+ return 2_000;
}
/**
@@ -249,12 +252,17 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
+ Random rnd = new Random();
+
while (!finished.get()) {
stopGrid(3);
U.sleep(300);
startGrid(3);
+
+ if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+ awaitPartitionMapExchange();
}
return null;
@@ -456,6 +464,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
}
+
+ checkOnePhaseCommitReturnValuesCleaned();
+ }
+
+ /**
+ *
+ */
+ protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException {
+ U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT);
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ IgniteKernal ignite = (IgniteKernal)grid(i);
+
+ IgniteTxManager tm = ignite.context().cache().context().tm();
+
+ Map completedVersHashMap = U.field(tm, "completedVersHashMap");
+
+ for (Object o : completedVersHashMap.values()) {
+ assertTrue("completedVersHashMap contains" + o.getClass() + " instead of boolean. " +
+ "These values should be replaced by boolean after onePhaseCommit finished. " +
+ "[node=" + i + "]", o instanceof Boolean);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 9204bc8..9bfde27 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.HashSet;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
@@ -88,16 +89,6 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
}
}
- /** {@inheritDoc} */
- @Override public void testGetAndPut() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1525");
- }
-
- /** {@inheritDoc} */
- @Override public void testInvoke() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1525");
- }
-
/**
* @throws Exception If failed.
*/
@@ -217,6 +208,70 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
}
/**
+ *
+ */
+ public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception {
+ ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));
+
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ final int keysCnt = keysCount();
+
+ IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Random rnd = new Random();
+
+ while (!finished.get()) {
+ stopGrid(0);
+
+ U.sleep(300);
+
+ startGrid(0);
+
+ if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+ awaitPartitionMapExchange();
+ }
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Object> fut2 = runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int iter = 0;
+
+ while (!finished.get()) {
+ try {
+ IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+
+ Integer val = ++iter;
+
+ for (int i = 0; i < keysCnt; i++)
+ cache.invoke(i, new SetEntryProcessor(val));
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+ }
+
+ return null;
+ }
+ });
+
+ try {
+ U.sleep(DURATION);
+ }
+ finally {
+ finished.set(true);
+
+ fut.get();
+ fut2.get();
+ }
+
+ checkOnePhaseCommitReturnValuesCleaned();
+ }
+
+ /**
* Callable to process inside transaction.
*/
private static class ProcessCallable implements Callable<Void> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-client-mode.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-client-mode.properties b/modules/yardstick/config/benchmark-client-mode.properties
index ba5525f..f7c8347 100644
--- a/modules/yardstick/config/benchmark-client-mode.properties
+++ b/modules/yardstick/config/benchmark-client-mode.properties
@@ -70,6 +70,8 @@ CONFIGS="\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx-win.properties b/modules/yardstick/config/benchmark-tx-win.properties
index 73b857d..54a40b1 100644
--- a/modules/yardstick/config/benchmark-tx-win.properties
+++ b/modules/yardstick/config/benchmark-tx-win.properties
@@ -54,6 +54,8 @@ set DRIVER_HOSTS=localhost
:: Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
set CONFIGS=^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx.properties b/modules/yardstick/config/benchmark-tx.properties
index f3dbc24..0d5bb02 100644
--- a/modules/yardstick/config/benchmark-tx.properties
+++ b/modules/yardstick/config/benchmark-tx.properties
@@ -59,6 +59,8 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS}
# Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
CONFIGS="\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,\
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-win.properties b/modules/yardstick/config/benchmark-win.properties
index b6ecd67..b75b5d6 100644
--- a/modules/yardstick/config/benchmark-win.properties
+++ b/modules/yardstick/config/benchmark-win.properties
@@ -59,6 +59,8 @@ set CONFIGS=^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds atomic-put-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds atomic-put-get-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds sql-query-1-backup,^
-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds sql-query-join-1-backup,^
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark.properties b/modules/yardstick/config/benchmark.properties
index 67ef5ef..cfc1499 100644
--- a/modules/yardstick/config/benchmark.properties
+++ b/modules/yardstick/config/benchmark.properties
@@ -71,6 +71,8 @@ CONFIGS="\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
new file mode 100644
index 0000000..40e563c
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ int key = nextRandom(args.range());
+
+ cache.getAndPut(key, new SampleValue(key));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("atomic");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
new file mode 100644
index 0000000..49ae985
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+ /** */
+ private IgniteTransactions txs;
+
+ /** */
+ private Callable<Void> clo;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+ ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+ txs = ignite().transactions();
+
+ clo = new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int key = nextRandom(args.range());
+
+ cache.getAndPut(key, new SampleValue(key));
+
+ return null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache("tx");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
index 8f05598..64dc6b8 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
@@ -17,12 +17,52 @@
package org.apache.ignite.yardstick.cache;
+import java.util.Map;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
/**
* Ignite benchmark that performs invoke operations.
*/
public class IgniteInvokeTxBenchmark extends IgniteInvokeBenchmark {
+ /** */
+ private IgniteTransactions txs;
+
+ /** */
+ private Callable<Void> clo;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+ ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+ txs = ignite().transactions();
+
+ clo = new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int key = nextRandom(args.range());
+
+ cache.invoke(key, new SetValueEntryProcessor(new SampleValue(key)));
+
+ return null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override protected IgniteCache<Integer, Object> cache() {
return ignite().cache("tx");
[07/12] ignite git commit: IGNITE-1525 Return value for cache
operation can be lost with onePhaseCommit
Posted by vo...@apache.org.
IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b72d18d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b72d18d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b72d18d
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 9b72d18dd94ec1383653f00474c102804c02790a
Parents: c3eff6b
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Sep 19 18:07:20 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Sep 19 18:07:20 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 12 +
.../communication/GridIoMessageFactory.java | 6 +
.../GridCacheReturnCompletableWrapper.java | 101 +++++++++
.../cache/GridDeferredAckMessageSender.java | 219 ++++++++++++++++++
.../GridDistributedTxRemoteAdapter.java | 59 +++--
.../distributed/dht/GridDhtTxFinishFuture.java | 12 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 33 ++-
.../dht/GridDhtTxFinishResponse.java | 52 ++++-
.../dht/GridDhtTxOnePhaseCommitAckRequest.java | 134 +++++++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 6 +-
.../dht/GridDhtTxPrepareRequest.java | 93 +++++---
.../cache/distributed/dht/GridDhtTxRemote.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 227 +++++--------------
...arOptimisticSerializableTxPrepareFuture.java | 4 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 7 +-
.../GridNearPessimisticTxPrepareFuture.java | 4 +-
.../near/GridNearTxFinishFuture.java | 112 +++++++--
.../cache/transactions/IgniteTxAdapter.java | 46 +++-
.../cache/transactions/IgniteTxHandler.java | 163 ++++++++++---
.../transactions/IgniteTxLocalAdapter.java | 19 +-
.../cache/transactions/IgniteTxManager.java | 154 ++++++++++++-
.../IgniteCachePutRetryAbstractSelfTest.java | 39 +++-
...gniteCachePutRetryTransactionalSelfTest.java | 75 +++++-
.../config/benchmark-client-mode.properties | 2 +
.../config/benchmark-tx-win.properties | 2 +
.../yardstick/config/benchmark-tx.properties | 2 +
.../yardstick/config/benchmark-win.properties | 2 +
modules/yardstick/config/benchmark.properties | 2 +
.../cache/IgniteGetAndPutBenchmark.java | 41 ++++
.../cache/IgniteGetAndPutTxBenchmark.java | 70 ++++++
.../cache/IgniteInvokeTxBenchmark.java | 40 ++++
31 files changed, 1405 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 7c428a6..ab6403f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -290,6 +290,18 @@ public final class IgniteSystemProperties {
public static final String IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT = "IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT";
/**
+ * One phase commit deferred ack request timeout.
+ */
+ public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+ "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT";
+
+ /**
+ * One phase commit deferred ack request buffer size.
+ */
+ public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+ "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE";
+
+ /**
* If this property set then debug console will be opened for H2 indexing SPI.
*/
public static final String IGNITE_H2_DEBUG_CONSOLE = "IGNITE_H2_DEBUG_CONSOLE";
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5f60215..8b8a734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
@@ -160,6 +161,11 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -27:
+ msg = new GridDhtTxOnePhaseCommitAckRequest();
+
+ break;
+
case -26:
msg = new TxLockList();
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
new file mode 100644
index 0000000..8ceaf71
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides initialized GridCacheReturn.
+ */
+public class GridCacheReturnCompletableWrapper {
+ /** Completable wrapper upd. */
+ private static final AtomicReferenceFieldUpdater<GridCacheReturnCompletableWrapper, Object> COMPLETABLE_WRAPPER_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(GridCacheReturnCompletableWrapper.class, Object.class, "o");
+
+ /** */
+ private volatile Object o;
+
+ /** Node id. */
+ private final UUID nodeId;
+
+ /**
+ * @param nodeId Node id.
+ */
+ public GridCacheReturnCompletableWrapper(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return ID of node initiated tx or {@code null} if this node is local.
+ */
+ @Nullable public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Marks as initialized.
+ *
+ * @param ret Return.
+ */
+ public void initialize(GridCacheReturn ret) {
+ final Object obj = this.o;
+
+ if (obj == null) {
+ boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, ret);
+
+ if (!res)
+ initialize(ret);
+ }
+ else if (obj instanceof GridFutureAdapter) {
+ ((GridFutureAdapter)obj).onDone(ret);
+
+ boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, obj, ret);
+
+ assert res;
+ }
+ else
+ throw new IllegalStateException("GridCacheReturnCompletableWrapper can't be reinitialized");
+ }
+
+ /**
+ * Allows wait for properly initialized value.
+ */
+ public IgniteInternalFuture<GridCacheReturn> fut() {
+ final Object obj = this.o;
+
+ if (obj instanceof GridCacheReturn)
+ return new GridFinishedFuture<>((GridCacheReturn)obj);
+ else if (obj instanceof IgniteInternalFuture)
+ return (IgniteInternalFuture)obj;
+ else if (obj == null) {
+ boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, new GridFutureAdapter<>());
+
+ if (res)
+ return (IgniteInternalFuture)this.o;
+ else
+ return fut();
+ }
+ else
+ throw new IllegalStateException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
new file mode 100644
index 0000000..7145dc2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ *
+ */
+public abstract class GridDeferredAckMessageSender {
+ /** Deferred message buffers. */
+ private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
+
+ /** Timeout processor. */
+ private GridTimeoutProcessor time;
+
+ /** Closure processor. */
+ public GridClosureProcessor closure;
+
+ /**
+ * @param time Time.
+ * @param closure Closure.
+ */
+ public GridDeferredAckMessageSender(GridTimeoutProcessor time,
+ GridClosureProcessor closure) {
+ this.time = time;
+ this.closure = closure;
+ }
+
+ /**
+ *
+ */
+ public abstract int getTimeout();
+
+ /**
+ *
+ */
+ public abstract int getBufferSize();
+
+ /**
+ *
+ */
+ public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+
+ /**
+ *
+ */
+ public void stop() {
+ for (DeferredAckMessageBuffer buf : deferredAckMsgBuffers.values())
+ buf.finish0();
+ }
+
+ /**
+ * @param nodeId Node ID to send message to.
+ * @param ver Version to ack.
+ */
+ public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+ while (true) {
+ DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
+
+ if (buf == null) {
+ buf = new DeferredAckMessageBuffer(nodeId);
+
+ DeferredAckMessageBuffer old = deferredAckMsgBuffers.putIfAbsent(nodeId, buf);
+
+ if (old == null) {
+ // We have successfully added buffer to map.
+ time.addTimeoutObject(buf);
+ }
+ else
+ buf = old;
+ }
+
+ if (!buf.add(ver))
+ // Some thread is sending filled up buffer, we can remove it.
+ deferredAckMsgBuffers.remove(nodeId, buf);
+ else
+ break;
+ }
+ }
+
+ /**
+ * Deferred message buffer.
+ */
+ private class DeferredAckMessageBuffer extends ReentrantReadWriteLock implements GridTimeoutObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Filled atomic flag. */
+ private AtomicBoolean guard = new AtomicBoolean(false);
+
+ /** Versions. */
+ private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Timeout ID. */
+ private final IgniteUuid timeoutId;
+
+ /** End time. */
+ private final long endTime;
+
+ /**
+ * @param nodeId Node ID to send message to.
+ */
+ private DeferredAckMessageBuffer(UUID nodeId) {
+ this.nodeId = nodeId;
+
+ timeoutId = IgniteUuid.fromUuid(nodeId);
+
+ endTime = U.currentTimeMillis() + getTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return timeoutId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ if (guard.compareAndSet(false, true)) {
+ closure.runLocalSafe(new Runnable() {
+ @Override public void run() {
+ writeLock().lock();
+
+ try {
+ finish0();
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Adds deferred request to buffer.
+ *
+ * @param ver Version to send.
+ * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
+ */
+ public boolean add(GridCacheVersion ver) {
+ readLock().lock();
+
+ boolean snd = false;
+
+ try {
+ if (guard.get())
+ return false;
+
+ vers.add(ver);
+
+ if (vers.sizex() > getBufferSize() && guard.compareAndSet(false, true))
+ snd = true;
+ }
+ finally {
+ readLock().unlock();
+ }
+
+ if (snd) {
+ // Wait all threads in read lock to finish.
+ writeLock().lock();
+
+ try {
+ finish0();
+
+ time.removeTimeoutObject(this);
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Sends deferred notification message and removes this buffer from pending responses map.
+ */
+ private void finish0() {
+ finish(nodeId, vers);
+
+ deferredAckMsgBuffers.remove(nodeId, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 9d9862a..4adfa8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -448,7 +450,25 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap();
+ GridCacheReturnCompletableWrapper wrapper = null;
+
if (!F.isEmpty(writeMap)) {
+ GridCacheReturn ret = null;
+
+ if (!near() && !local() && onePhaseCommit()) {
+ if (needReturnValue()) {
+ ret = new GridCacheReturn(null, cctx.localNodeId().equals(otherNodeId()), true, null, true);
+
+ UUID origNodeId = otherNodeId(); // Originating node.
+
+ cctx.tm().addCommittedTxReturn(this,
+ wrapper = new GridCacheReturnCompletableWrapper(
+ !cctx.localNodeId().equals(origNodeId) ? origNodeId : null));
+ }
+ else
+ cctx.tm().addCommittedTx(this, this.nearXidVersion(), null);
+ }
+
// Register this transaction as completed prior to write-phase to
// ensure proper lock ordering for removed entries.
cctx.tm().addCommittedTx(this);
@@ -457,13 +477,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
batchStoreCommit(writeMap().values());
- // Node that for near transactions we grab all entries.
- for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
- GridCacheContext cacheCtx = txEntry.context();
+ try {
+ // Node that for near transactions we grab all entries.
+ for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
+ GridCacheContext cacheCtx = txEntry.context();
- boolean replicate = cacheCtx.isDrEnabled();
+ boolean replicate = cacheCtx.isDrEnabled();
- try {
while (true) {
try {
GridCacheEntryEx cached = txEntry.cached();
@@ -486,7 +506,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
txEntry.cached().unswap(false);
IgniteBiTuple<GridCacheOperation, CacheObject> res =
- applyTransformClosures(txEntry, false);
+ applyTransformClosures(txEntry, false, ret);
GridCacheOperation op = res.get1();
CacheObject val = res.get2();
@@ -672,21 +692,26 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
}
- catch (Throwable ex) {
- // In case of error, we still make the best effort to commit,
- // as there is no way to rollback at this point.
- err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
- "(all transaction entries will be invalidated): " + CU.txString(this), ex);
+ }
+ catch (Throwable ex) {
+ // In case of error, we still make the best effort to commit,
+ // as there is no way to rollback at this point.
+ err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
+ "(all transaction entries will be invalidated): " + CU.txString(this), ex);
- U.error(log, "Commit failed.", err);
+ U.error(log, "Commit failed.", err);
- uncommit();
+ uncommit();
- state(UNKNOWN);
+ state(UNKNOWN);
- if (ex instanceof Error)
- throw (Error)ex;
- }
+ if (ex instanceof Error)
+ throw (Error)ex;
+
+ }
+ finally {
+ if (wrapper != null)
+ wrapper.initialize(ret);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d2e26b4..ac2ab41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -351,7 +351,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ false,
+ false);
try {
cctx.io().send(n, req, tx.ioPolicy());
@@ -448,7 +450,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.subjectId(),
tx.taskNameHash(),
tx.activeCachesDeploymentEnabled(),
- updCntrs);
+ updCntrs,
+ false,
+ false);
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
@@ -516,7 +520,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ false,
+ false);
req.writeVersion(tx.writeVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 2d98e0d..c618a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -46,6 +46,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+ /** */
+ public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
+
/** Near node ID. */
private UUID nearNodeId;
@@ -141,7 +144,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
int txSize,
@Nullable UUID subjId,
int taskNameHash,
- boolean addDepInfo
+ boolean addDepInfo,
+ boolean retVal,
+ boolean waitRemoteTxs
) {
super(
xidVer,
@@ -172,6 +177,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
this.sysInvalidate = sysInvalidate;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+
+ needReturnValue(retVal);
+ waitRemoteTransactions(waitRemoteTxs);
}
/**
@@ -224,11 +232,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
@Nullable UUID subjId,
int taskNameHash,
boolean addDepInfo,
- Collection<Long> updateIdxs
+ Collection<Long> updateIdxs,
+ boolean retVal,
+ boolean waitRemoteTxs
) {
this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
- subjId, taskNameHash, addDepInfo);
+ subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -339,6 +349,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
}
+ /**
+ * @return Flag indicating whether transaction needs return value.
+ */
+ public boolean needReturnValue() {
+ return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param retVal Need return value.
+ */
+ public void needReturnValue(boolean retVal) {
+ if (retVal)
+ flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+ else
+ flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 78dc16f..0618172 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -19,9 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -51,6 +52,9 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** Flag indicating if this is a check-committed response. */
private boolean checkCommitted;
+ /** Cache return value. */
+ private GridCacheReturn retVal;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -112,6 +116,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
if (checkCommittedErr != null && checkCommittedErrBytes == null)
checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr);
+
+ if (retVal != null && retVal.cacheId() != 0) {
+ GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+ assert cctx != null : retVal.cacheId();
+
+ retVal.prepareMarshal(cctx);
+ }
}
/** {@inheritDoc} */
@@ -121,6 +133,28 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
if (checkCommittedErrBytes != null && checkCommittedErr == null)
checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+ if (retVal != null && retVal.cacheId() != 0) {
+ GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+ assert cctx != null : retVal.cacheId();
+
+ retVal.finishUnmarshal(cctx, ldr);
+ }
+ }
+
+ /**
+ * @param retVal Return value.
+ */
+ public void returnValue(GridCacheReturn retVal) {
+ this.retVal = retVal;
+ }
+
+ /**
+ * @return Return value.
+ */
+ public GridCacheReturn returnValue() {
+ return retVal;
}
/** {@inheritDoc} */
@@ -161,6 +195,12 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
writer.incrementState();
+ case 8:
+ if (!writer.writeMessage("retVal", retVal))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -201,6 +241,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
reader.incrementState();
+ case 8:
+ retVal = reader.readMessage("retVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDhtTxFinishResponse.class);
@@ -213,6 +261,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 9;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
new file mode 100644
index 0000000..0c8ae69
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * One Phase Commit Near transaction ack request.
+ */
+public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Lock or transaction versions. */
+ @GridToStringInclude
+ @GridDirectCollection(GridCacheVersion.class)
+ protected Collection<GridCacheVersion> vers;
+
+ /**
+ * Default constructor.
+ */
+ public GridDhtTxOnePhaseCommitAckRequest() {
+ // No-op.
+ }
+
+ /**
+ *
+ * @param vers Near Tx xid Versions.
+ */
+ public GridDhtTxOnePhaseCommitAckRequest(Collection<GridCacheVersion> vers) {
+ this.vers = vers;
+ }
+
+ /**
+ * @return Version.
+ */
+ public Collection<GridCacheVersion> versions() {
+ return vers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtTxOnePhaseCommitAckRequest.class, this, super.toString());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ vers = reader.readCollection("vers", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtTxOnePhaseCommitAckRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -27;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ec73bff..1dbda69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1245,7 +1245,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.onePhaseCommit(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ retVal);
int idx = 0;
@@ -1356,7 +1357,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.onePhaseCommit(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ retVal);
for (IgniteTxEntry entry : nearMapping.entries()) {
if (CU.writes().apply(entry)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 1cdc96f..a8f2087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -52,6 +52,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
+
/** Max order. */
private UUID nearNodeId;
@@ -100,6 +103,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Preload keys. */
private BitSet preloadKeys;
+ /** */
+ private byte flags;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -118,6 +124,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param txNodes Transaction nodes mapping.
* @param nearXidVer Near transaction ID.
* @param last {@code True} if this is last prepare request for node.
+ * @param retVal Need return value flag.
* @param addDepInfo Deployment info flag.
*/
public GridDhtTxPrepareRequest(
@@ -134,7 +141,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean onePhaseCommit,
UUID subjId,
int taskNameHash,
- boolean addDepInfo) {
+ boolean addDepInfo,
+ boolean retVal) {
super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo);
assert futId != null;
@@ -149,12 +157,31 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ needReturnValue(retVal);
+
invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size());
nearNodeId = tx.nearNodeId();
}
/**
+ * @return Flag indicating whether transaction needs return value.
+ */
+ public boolean needReturnValue() {
+ return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param retVal Need return value.
+ */
+ public void needReturnValue(boolean retVal) {
+ if (retVal)
+ flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+ else
+ flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+ }
+
+ /**
* @return {@code True} if this is last prepare request for node.
*/
public boolean last() {
@@ -348,78 +375,84 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
switch (writer.state()) {
case 23:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 24:
- if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 25:
- if (!writer.writeBoolean("last", last))
+ if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
case 26:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
case 27:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 28:
- if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 29:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
+ if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 30:
- if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
case 31:
- if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 32:
- if (!writer.writeBitSet("preloadKeys", preloadKeys))
+ if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 33:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
case 34:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 35:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 36:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -442,7 +475,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
switch (reader.state()) {
case 23:
- futId = reader.readIgniteUuid("futId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -450,7 +483,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 24:
- invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -458,7 +491,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 25:
- last = reader.readBoolean("last");
+ invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
return false;
@@ -466,7 +499,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 26:
- miniId = reader.readIgniteUuid("miniId");
+ last = reader.readBoolean("last");
if (!reader.isLastRead())
return false;
@@ -474,7 +507,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 27:
- nearNodeId = reader.readUuid("nearNodeId");
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -482,7 +515,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 28:
- nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
+ nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
return false;
@@ -490,7 +523,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 29:
- nearXidVer = reader.readMessage("nearXidVer");
+ nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -498,7 +531,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 30:
- ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
+ nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
return false;
@@ -506,7 +539,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 31:
- ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
+ ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -514,7 +547,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 32:
- preloadKeys = reader.readBitSet("preloadKeys");
+ ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -522,7 +555,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 33:
- subjId = reader.readUuid("subjId");
+ preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
return false;
@@ -530,7 +563,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 34:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -538,6 +571,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
case 35:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 36:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -557,6 +598,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 36;
+ return 37;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index dc27eb1..6ad20c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -189,9 +189,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
commitVer,
sys,
plc,
- concurrency,
- isolation,
- invalidate,
+ concurrency,
+ isolation,
+ invalidate,
timeout,
txSize,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1e45fa7..30a3d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -29,9 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
@@ -60,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
+import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -82,7 +80,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -102,11 +99,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
@@ -144,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
/** Pending */
- private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
+ private GridDeferredAckMessageSender deferredUpdateMessageSender;
/** */
private GridNearAtomicCache<K, V> near;
@@ -240,6 +235,53 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
+ deferredUpdateMessageSender = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
+ @Override public int getTimeout() {
+ return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+ }
+
+ @Override public int getBufferSize() {
+ return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
+ }
+
+ @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+ GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+ vers, ctx.deploymentEnabled());
+
+ try {
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ ctx.io().send(nodeId, msg, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
+ ", node=" + nodeId + ']');
+ }
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+ }
+ catch (IllegalStateException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
+ "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to send deferred DHT update response, node left [" +
+ "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send deferred DHT update response to remote node [" +
+ "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
+ }
+ }
+ };
+
CacheMetricsImpl m = new CacheMetricsImpl(ctx);
if (ctx.dht().near() != null)
@@ -405,8 +447,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void stop() {
- for (DeferredResponseBuffer buf : pendingResponses.values())
- buf.finish();
+ deferredUpdateMessageSender.stop();
}
/**
@@ -3208,28 +3249,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param ver Version to ack.
*/
private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
- while (true) {
- DeferredResponseBuffer buf = pendingResponses.get(nodeId);
-
- if (buf == null) {
- buf = new DeferredResponseBuffer(nodeId);
-
- DeferredResponseBuffer old = pendingResponses.putIfAbsent(nodeId, buf);
-
- if (old == null) {
- // We have successfully added buffer to map.
- ctx.time().addTimeoutObject(buf);
- }
- else
- buf = old;
- }
-
- if (!buf.addResponse(ver))
- // Some thread is sending filled up buffer, we can remove it.
- pendingResponses.remove(nodeId, buf);
- else
- break;
- }
+ deferredUpdateMessageSender.sendDeferredAckMessage(nodeId, ver);
}
/**
@@ -3452,149 +3472,4 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return Collections.emptyList();
}
}
-
- /**
- * Deferred response buffer.
- */
- private class DeferredResponseBuffer extends ReentrantReadWriteLock implements GridTimeoutObject {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Filled atomic flag. */
- private AtomicBoolean guard = new AtomicBoolean(false);
-
- /** Response versions. */
- private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
-
- /** Node ID. */
- private final UUID nodeId;
-
- /** Timeout ID. */
- private final IgniteUuid timeoutId;
-
- /** End time. */
- private final long endTime;
-
- /**
- * @param nodeId Node ID to send message to.
- */
- private DeferredResponseBuffer(UUID nodeId) {
- this.nodeId = nodeId;
-
- timeoutId = IgniteUuid.fromUuid(nodeId);
-
- endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return timeoutId;
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- return endTime;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- if (guard.compareAndSet(false, true)) {
- ctx.closures().runLocalSafe(new Runnable() {
- @Override public void run() {
- writeLock().lock();
-
- try {
- finish();
- }
- finally {
- writeLock().unlock();
- }
- }
- });
- }
- }
-
- /**
- * Adds deferred response to buffer.
- *
- * @param ver Version to send.
- * @return {@code True} if response was handled, {@code false} if this buffer is filled and cannot be used.
- */
- public boolean addResponse(GridCacheVersion ver) {
- readLock().lock();
-
- boolean snd = false;
-
- try {
- if (guard.get())
- return false;
-
- respVers.add(ver);
-
- if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
- snd = true;
- }
- finally {
- readLock().unlock();
- }
-
- if (snd) {
- // Wait all threads in read lock to finish.
- writeLock().lock();
-
- try {
- finish();
-
- ctx.time().removeTimeoutObject(this);
- }
- finally {
- writeLock().unlock();
- }
- }
-
- return true;
- }
-
- /**
- * Sends deferred notification message and removes this buffer from pending responses map.
- */
- private void finish() {
- GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
- respVers, ctx.deploymentEnabled());
-
- try {
- ctx.kernalContext().gateway().readLock();
-
- try {
- ctx.io().send(nodeId, msg, ctx.ioPolicy());
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
- ", node=" + nodeId + ']');
- }
- }
- finally {
- ctx.kernalContext().gateway().readUnlock();
- }
- }
- catch (IllegalStateException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send deferred DHT update response, node left [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send deferred DHT update response to remote node [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
- }
-
- pendingResponses.remove(nodeId, this);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index d251528..4cbfb27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -526,7 +526,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
) {
GridCacheContext cacheCtx = entry.context();
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+ List<ClusterNode> nodes = cacheCtx.isLocal() ?
+ cacheCtx.affinity().nodes(entry.key(), topVer) :
+ cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
txMapping.addMapping(nodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e17a76c..91cfbda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -27,7 +27,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
@@ -599,9 +598,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
GridCacheEntryEx cached0 = entry.cached();
if (cached0.isDht())
- nodes = cacheCtx.affinity().nodes(cached0.partition(), topVer);
+ nodes = cacheCtx.topology().nodes(cached0.partition(), topVer);
else
- nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+ nodes = cacheCtx.isLocal() ?
+ cacheCtx.affinity().nodes(entry.key(), topVer) :
+ cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
txMapping.addMapping(nodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 34b8281..5c09398 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -193,7 +193,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
GridCacheContext cacheCtx = txEntry.context();
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
+ List<ClusterNode> nodes = cacheCtx.isLocal() ?
+ cacheCtx.affinity().nodes(txEntry.key(), topVer) :
+ cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
ClusterNode primary = F.first(nodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index bb5d482..46604c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -34,6 +34,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
@@ -76,6 +78,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0");
/** */
+ public static final IgniteProductVersion ACK_DHT_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.6.8");
+
+ /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -251,6 +256,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
assert f.node().id().equals(nodeId);
+ if (res.returnValue() != null)
+ tx.implicitSingleResult(res.returnValue());
+
f.onDhtFinishResponse(res);
}
}
@@ -432,6 +440,50 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
catch (IgniteCheckedException e) {
onDone(e);
}
+ finally {
+ if (commit &&
+ tx.onePhaseCommit() &&
+ !tx.writeMap().isEmpty()) // Readonly operations require no ack.
+ ackBackup();
+ }
+ }
+
+ /**
+ *
+ */
+ private void ackBackup() {
+ if (mappings.empty())
+ return;
+
+ if (!tx.needReturnValue() || !tx.implicit())
+ return; // GridCacheReturn was not saved at backup.
+
+ GridDistributedTxMapping mapping = mappings.singleMapping();
+
+ if (mapping != null) {
+ UUID nodeId = mapping.node().id();
+
+ Collection<UUID> backups = tx.transactionNodes().get(nodeId);
+
+ if (!F.isEmpty(backups)) {
+ assert backups.size() == 1 : backups;
+
+ UUID backupId = F.first(backups);
+
+ ClusterNode backup = cctx.discovery().node(backupId);
+
+ // Nothing to do if backup has left the grid.
+ if (backup == null) {
+ // No-op.
+ }
+ else if (backup.isLocal())
+ cctx.tm().removeTxReturn(tx.xidVersion());
+ else {
+ if (ACK_DHT_ONE_PHASE_SINCE.compareToIgnoreTimestamp(backup.version()) <= 0)
+ cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion());
+ }
+ }
+ }
}
/**
@@ -475,23 +527,48 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
readyNearMappingFromBackup(mapping);
if (committed) {
- if (tx.syncMode() == FULL_SYNC) {
- GridCacheVersion nearXidVer = tx.nearXidVersion();
+ try {
+ if (tx.needReturnValue() && tx.implicit()) {
+ GridCacheReturnCompletableWrapper wrapper =
+ cctx.tm().getCommittedTxReturn(tx.xidVersion());
- assert nearXidVer != null : tx;
+ assert wrapper != null : tx.xidVersion();
- IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+ GridCacheReturn retVal = wrapper.fut().get();
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- mini.onDone(tx);
- }
- });
+ assert retVal != null;
+
+ tx.implicitSingleResult(retVal);
+ }
- return;
+ if (tx.syncMode() == FULL_SYNC) {
+ GridCacheVersion nearXidVer = tx.nearXidVersion();
+
+ assert nearXidVer != null : tx;
+
+ IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ mini.onDone(tx);
+ }
+ });
+
+ return;
+ }
+
+ mini.onDone(tx);
}
+ catch (IgniteCheckedException e) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near finish fut, failed to finish [" +
+ "txId=" + tx.nearXidVersion() +
+ ", node=" + backup.id() +
+ ", err=" + e + ']');
+ }
- mini.onDone(tx);
+ mini.onDone(e);
+ }
}
else {
ClusterTopologyCheckedException cause =
@@ -504,7 +581,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else {
- GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
+ GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId(), false);
// Preserve old behavior, otherwise response is not sent.
if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
@@ -765,9 +842,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* @param miniId Mini future ID.
+ * @param waitRemoteTxs Wait for remote txs.
* @return Finish request.
*/
- private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+ private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, boolean waitRemoteTxs) {
GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
cctx.localNodeId(),
futureId(),
@@ -791,7 +869,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
0,
null,
0,
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
+ waitRemoteTxs);
finishReq.checkCommitted(true);
@@ -872,9 +952,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(mini);
- GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
-
- req.waitRemoteTransactions(true);
+ GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
for (UUID backupId : backups) {
ClusterNode backup = cctx.discovery().node(backupId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb2989e..18c3011 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -151,6 +152,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
@GridToStringExclude
protected GridCacheSharedContext<?, ?> cctx;
+ /** Need return value. */
+ protected boolean needRetVal;
+
/**
* End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
* assigned to this transaction at the end of write phase.
@@ -695,6 +699,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/**
+ * @return Flag indicating whether transaction needs return value.
+ */
+ public boolean needReturnValue() {
+ return needRetVal;
+ }
+
+ /**
+ * @param needRetVal Need return value flag.
+ */
+ public void needReturnValue(boolean needRetVal) {
+ this.needRetVal = needRetVal;
+ }
+
+ /**
* Gets remaining allowed transaction time.
*
* @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out.
@@ -1285,7 +1303,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
if (intercept || !F.isEmpty(e.entryProcessors()))
e.cached().unswap(false);
- IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false);
+ IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false, null);
GridCacheContext cacheCtx = e.context();
@@ -1443,13 +1461,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/**
* @param txEntry Entry to process.
* @param metrics {@code True} if metrics should be updated.
+ * @param ret Optional return value to initialize.
* @return Tuple containing transformation results.
* @throws IgniteCheckedException If failed to get previous value for transform.
* @throws GridCacheEntryRemovedException If entry was concurrently deleted.
*/
protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures(
IgniteTxEntry txEntry,
- boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
+ boolean metrics,
+ @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException {
GridCacheContext cacheCtx = txEntry.context();
assert cacheCtx != null;
@@ -1457,8 +1477,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
if (isSystemInvalidate())
return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
- if (F.isEmpty(txEntry.entryProcessors()))
+ if (F.isEmpty(txEntry.entryProcessors())) {
+ if (ret != null)
+ ret.value(cacheCtx, txEntry.value(), txEntry.keepBinary());
+
return F.t(txEntry.op(), txEntry.value());
+ }
else {
T2<GridCacheOperation, CacheObject> calcVal = txEntry.entryProcessorCalculatedValue();
@@ -1508,17 +1532,27 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
txEntry.key(), key, cacheVal, val, ver, keepBinary, txEntry.cached());
+ Object procRes = null;
+ Exception err = null;
+
try {
EntryProcessor<Object, Object, Object> processor = t.get1();
- processor.process(invokeEntry, t.get2());
+ procRes = processor.process(invokeEntry, t.get2());
val = invokeEntry.getValue();
key = invokeEntry.key();
}
- catch (Exception ignore) {
- // No-op.
+ catch (Exception e) {
+ err = e;
+ }
+
+ if (ret != null) {
+ if (err != null || procRes != null)
+ ret.addEntryProcessResult(txEntry.context(), txEntry.key(), null, procRes, err, keepBinary);
+ else
+ ret.invokeResult(true);
}
modified |= invokeEntry.modified();
[05/12] ignite git commit: IGNITE-3406 - Fix incorrect patch.
Posted by vo...@apache.org.
IGNITE-3406 - Fix incorrect patch.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3eff6b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3eff6b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3eff6b6
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c3eff6b6c7817f83f07afcff8784ec6aa9473876
Parents: 147ab9c
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Sep 19 16:18:14 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Sep 19 16:18:14 2016 +0300
----------------------------------------------------------------------
...idAbstractCacheInterceptorRebalanceTest.java | 356 +++++++++++++++++++
...heInterceptorAtomicOffheapRebalanceTest.java | 30 ++
...GridCacheInterceptorAtomicRebalanceTest.java | 36 ++
...ceptorTransactionalOffheapRebalanceTest.java | 35 ++
...heInterceptorTransactionalRebalanceTest.java | 36 ++
5 files changed, 493 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
new file mode 100644
index 0000000..9405a19
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public abstract class GridAbstractCacheInterceptorRebalanceTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE_NAME = "test_cache";
+
+ /** */
+ private static final int CNT = 10_000;
+
+ /** */
+ private static final int TEST_ITERATIONS = 5;
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static volatile boolean failed;
+
+ /** */
+ private static CacheInterceptor<Integer, Integer> interceptor;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ final CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+ ccfg.setInterceptor(interceptor);
+ ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+ ccfg.setBackups(2);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache atomicity mode.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Cache memory mode;
+ */
+ protected abstract CacheMemoryMode memoryMode();
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testRebalanceUpdate() throws Exception {
+ interceptor = new RebalanceUpdateInterceptor();
+
+ testRebalance(new Operation() {
+ @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+ cache.put(key, val);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testRebalanceUpdateInvoke() throws Exception {
+ interceptor = new RebalanceUpdateInterceptor();
+
+ final UpdateEntryProcessor proc = new UpdateEntryProcessor();
+
+ testRebalance(new Operation() {
+ @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+ cache.invoke(key, proc, val);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testRebalanceRemoveInvoke() throws Exception {
+ interceptor = new RebalanceUpdateInterceptor();
+
+ final RemoveEntryProcessor proc = new RemoveEntryProcessor();
+
+ testRebalance(new Operation() {
+ @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+ cache.invoke(key, proc, val);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testRebalanceRemove() throws Exception {
+ interceptor = new RebalanceRemoveInterceptor();
+
+ testRebalance(new Operation() {
+ @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+ cache.remove(key);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testPutIfAbsent() throws Exception {
+ interceptor = new RebalanceUpdateInterceptor();
+
+ testRebalance(new Operation() {
+ @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+ cache.putIfAbsent(key, val);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testGetAndPut() throws Exception {
+ interceptor = new RebalanceUpdateInterceptor();
+
+ testRebalance(new Operation() {
+ @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+ final Integer old = cache.getAndPut(key, val);
+
+ assert val == old + 1 : "Unexpected old value: " + old;
+ }
+ });
+ }
+
+ /**
+ * @param operation Operation to be tested.
+ * @throws Exception If fail.
+ */
+ private void testRebalance(final Operation operation) throws Exception {
+ interceptor = new RebalanceUpdateInterceptor();
+
+ for (int iter = 0; iter < TEST_ITERATIONS; iter++) {
+ log.info("Iteration: " + iter);
+
+ failed = false;
+
+ final IgniteEx ignite = startGrid(1);
+
+ final IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+
+ for (int i = 0; i < CNT; i++)
+ cache.put(i, i);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final IgniteInternalFuture<Object> updFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ for (int j = 1; j <= 3; j++) {
+ for (int i = 0; i < CNT; i++) {
+ if (i % 2 == 0) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ operation.run(cache, i, i + j);
+
+ tx.commit();
+ }
+ }
+ else
+ operation.run(cache, i, i + j);
+ }
+ }
+
+ return null;
+ }
+ });
+
+ final IgniteInternalFuture<Object> rebFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ for (int i = 2; i < NODES; i++)
+ startGrid(i);
+
+ return null;
+ }
+ });
+
+ latch.countDown();
+
+ updFut.get();
+ rebFut.get();
+
+ stopAllGrids();
+
+ assertFalse(failed);
+ }
+ }
+
+ /**
+ *
+ */
+ private interface Operation {
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @param val Value.
+ */
+ void run(IgniteCache<Integer, Integer> cache, Integer key, Integer val);
+ }
+
+ /**
+ *
+ */
+ private static class UpdateEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer process(final MutableEntry<Integer, Integer> entry,
+ final Object... arguments) throws EntryProcessorException {
+ entry.setValue((Integer) arguments[0]);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoveEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer process(final MutableEntry<Integer, Integer> entry,
+ final Object... arguments) throws EntryProcessorException {
+ entry.remove();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RebalanceUpdateInterceptor extends CacheInterceptorAdapter<Integer, Integer> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Integer onBeforePut(final Cache.Entry entry, final Integer newVal) {
+ try {
+ boolean first = entry.getKey().equals(newVal);
+
+ if (first)
+ assertNull("Expected null old value: " + entry, entry.getValue());
+ else {
+ Integer old = (Integer)entry.getValue();
+
+ assertNotNull("Null old value: " + entry, old);
+ assertEquals("Unexpected old value: " + entry, newVal.intValue(), old + 1);
+ }
+ }
+ catch (Throwable e) {
+ failed = true;
+
+ System.out.println("Unexpected error: " + e);
+ e.printStackTrace(System.out);
+ }
+
+ return newVal;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RebalanceRemoveInterceptor extends CacheInterceptorAdapter<Integer, Integer> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteBiTuple<Boolean, Integer> onBeforeRemove(
+ final Cache.Entry<Integer, Integer> entry) {
+ try {
+ assertNotNull("Null old value: " + entry, entry.getValue());
+ assertEquals("Unexpected old value: " + entry, entry.getKey(), entry.getValue());
+ }
+ catch (Throwable t) {
+ failed = true;
+
+ System.out.println("Unexpected error: " + t);
+ t.printStackTrace(System.out);
+ }
+
+ return new T2<>(true, null);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java
new file mode 100644
index 0000000..103322f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorAtomicOffheapRebalanceTest extends GridCacheInterceptorAtomicRebalanceTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java
new file mode 100644
index 0000000..aaeda4b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorAtomicRebalanceTest extends GridAbstractCacheInterceptorRebalanceTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.ONHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java
new file mode 100644
index 0000000..bb90062
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorTransactionalOffheapRebalanceTest extends GridCacheInterceptorTransactionalRebalanceTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60_000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java
new file mode 100644
index 0000000..bace87c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorTransactionalRebalanceTest extends GridAbstractCacheInterceptorRebalanceTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.ONHEAP_TIERED;
+ }
+}
[09/12] ignite git commit: Client discovery: wait during join if
receive RES_CONTINUE_JOIN, RES_WAIT.
Posted by vo...@apache.org.
Client discovery: wait during join if receive RES_CONTINUE_JOIN, RES_WAIT.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1372ce2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1372ce2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1372ce2
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c1372ce2f0633968036fcfb079718214605c3350
Parents: 780bf23
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 20 11:39:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 20 11:39:37 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1372ce2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index bf7f519..2c85645 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -497,6 +497,8 @@ class ClientImpl extends TcpDiscoveryImpl {
Iterator<InetSocketAddress> it = addrs.iterator();
+ boolean wait = false;
+
while (it.hasNext()) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
@@ -515,12 +517,17 @@ class ClientImpl extends TcpDiscoveryImpl {
Socket sock = sockAndRes.get1().socket();
+ if (log.isDebugEnabled())
+ log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
+
switch (sockAndRes.get2()) {
case RES_OK:
return new T2<>(sockAndRes.get1(), sockAndRes.get3());
case RES_CONTINUE_JOIN:
case RES_WAIT:
+ wait = true;
+
U.closeQuiet(sock);
break;
@@ -533,7 +540,16 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- if (addrs.isEmpty()) {
+ if (wait) {
+ if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
+ return null;
+
+ if (log.isDebugEnabled())
+ log.debug("Will wait before retry join.");
+
+ Thread.sleep(2000);
+ }
+ else if (addrs.isEmpty()) {
if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
return null;
[12/12] ignite git commit: Merge branch 'ignite-1.6.8' into
ignite-1.6.8-hadoop
Posted by vo...@apache.org.
Merge branch 'ignite-1.6.8' into ignite-1.6.8-hadoop
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d08b5cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d08b5cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d08b5cb
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 4d08b5cb52a7d0dc1218ebdb750cef8b0032f810
Parents: ee5a818 16b82b7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Sep 23 13:40:27 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 23 13:40:27 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 12 +
.../internal/GridEventConsumeHandler.java | 5 -
.../internal/GridMessageListenHandler.java | 5 -
.../internal/binary/BinaryObjectExImpl.java | 164 ++++++---
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheEntryEx.java | 8 +
.../processors/cache/GridCacheMapEntry.java | 9 +-
.../GridCacheReturnCompletableWrapper.java | 101 ++++++
.../cache/GridDeferredAckMessageSender.java | 219 ++++++++++++
.../GridDistributedTxRemoteAdapter.java | 65 +++-
.../distributed/dht/GridDhtTxFinishFuture.java | 12 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 33 +-
.../dht/GridDhtTxFinishResponse.java | 52 ++-
.../dht/GridDhtTxOnePhaseCommitAckRequest.java | 134 +++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 42 ++-
.../dht/GridDhtTxPrepareRequest.java | 93 +++--
.../cache/distributed/dht/GridDhtTxRemote.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 227 +++---------
...arOptimisticSerializableTxPrepareFuture.java | 4 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 7 +-
.../GridNearPessimisticTxPrepareFuture.java | 4 +-
.../near/GridNearTxFinishFuture.java | 112 +++++-
.../continuous/CacheContinuousQueryHandler.java | 5 -
.../cache/transactions/IgniteTxAdapter.java | 46 ++-
.../cache/transactions/IgniteTxEntry.java | 44 ++-
.../cache/transactions/IgniteTxHandler.java | 163 +++++++--
.../transactions/IgniteTxLocalAdapter.java | 27 +-
.../cache/transactions/IgniteTxManager.java | 154 +++++++-
.../continuous/GridContinuousHandler.java | 8 -
.../continuous/GridContinuousProcessor.java | 33 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 18 +-
.../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +-
.../binary/BinaryObjectToStringSelfTest.java | 92 +++++
.../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++
...idAbstractCacheInterceptorRebalanceTest.java | 356 +++++++++++++++++++
...heInterceptorAtomicOffheapRebalanceTest.java | 30 ++
...GridCacheInterceptorAtomicRebalanceTest.java | 36 ++
...ceptorTransactionalOffheapRebalanceTest.java | 35 ++
...heInterceptorTransactionalRebalanceTest.java | 36 ++
.../processors/cache/GridCacheTestEntryEx.java | 4 +
.../IgniteCacheInterceptorSelfTestSuite.java | 5 +
.../IgniteCachePutRetryAbstractSelfTest.java | 39 +-
...gniteCachePutRetryTransactionalSelfTest.java | 75 +++-
...ContinuousQueryFailoverAbstractSelfTest.java | 99 ++++++
...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++
.../file/GridFileSwapSpaceSpiSelfTest.java | 89 +++++
.../IgniteBinaryObjectsTestSuite.java | 2 +
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../config/benchmark-client-mode.properties | 2 +
.../config/benchmark-tx-win.properties | 2 +
.../yardstick/config/benchmark-tx.properties | 2 +
.../yardstick/config/benchmark-win.properties | 2 +
modules/yardstick/config/benchmark.properties | 2 +
.../cache/IgniteGetAndPutBenchmark.java | 41 +++
.../cache/IgniteGetAndPutTxBenchmark.java | 70 ++++
.../cache/IgniteInvokeTxBenchmark.java | 40 +++
56 files changed, 2666 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
[04/12] ignite git commit: IGNITE-3406 - Interceptor and continuous
query get correct old value during rebalancing.
Posted by vo...@apache.org.
IGNITE-3406 - Interceptor and continuous query get correct old value during rebalancing.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/147ab9c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/147ab9c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/147ab9c0
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 147ab9c08f6ac7edecf656b23d8b25bfab91becf
Parents: c24caba
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Sep 19 13:58:41 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Sep 19 13:58:41 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 8 ++
.../processors/cache/GridCacheMapEntry.java | 9 +-
.../GridDistributedTxRemoteAdapter.java | 6 ++
.../distributed/dht/GridDhtTxPrepareFuture.java | 36 ++++++-
.../cache/transactions/IgniteTxEntry.java | 44 ++++++++-
.../transactions/IgniteTxLocalAdapter.java | 8 ++
.../processors/cache/GridCacheTestEntryEx.java | 4 +
.../IgniteCacheInterceptorSelfTestSuite.java | 5 +
...ContinuousQueryFailoverAbstractSelfTest.java | 99 ++++++++++++++++++++
9 files changed, 213 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 616854f..ef6a244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -358,6 +358,8 @@ public interface GridCacheEntryEx {
* @param evt Flag to signal event notification.
* @param metrics Flag to signal metrics update.
* @param keepBinary Keep binary flag.
+ * @param oldValPresent {@code True} if oldValue present.
+ * @param oldVal Old value.
* @param topVer Topology version.
* @param filter Filter.
* @param drType DR type.
@@ -383,6 +385,8 @@ public interface GridCacheEntryEx {
boolean evt,
boolean metrics,
boolean keepBinary,
+ boolean oldValPresent,
+ @Nullable CacheObject oldVal,
AffinityTopologyVersion topVer,
CacheEntryPredicate[] filter,
GridDrType drType,
@@ -402,6 +406,8 @@ public interface GridCacheEntryEx {
* @param evt Flag to signal event notification.
* @param metrics Flag to signal metrics notification.
* @param keepBinary Keep binary flag.
+ * @param oldValPresent {@code True} if oldValue present.
+ * @param oldVal Old value.
* @param topVer Topology version.
* @param filter Filter.
* @param drType DR type.
@@ -422,6 +428,8 @@ public interface GridCacheEntryEx {
boolean evt,
boolean metrics,
boolean keepBinary,
+ boolean oldValPresent,
+ @Nullable CacheObject oldVal,
AffinityTopologyVersion topVer,
CacheEntryPredicate[] filter,
GridDrType drType,
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c760ac1..a9ac1e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1141,6 +1141,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean evt,
boolean metrics,
boolean keepBinary,
+ boolean oldValPresent,
+ @Nullable CacheObject oldVal,
AffinityTopologyVersion topVer,
CacheEntryPredicate[] filter,
GridDrType drType,
@@ -1198,7 +1200,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Map<UUID, CacheContinuousQueryListener> lsnrCol =
notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
- old = (retval || intercept || lsnrCol != null) ?
+ old = oldValPresent ? oldVal :
+ (retval || intercept || lsnrCol != null) ?
rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val;
if (intercept) {
@@ -1333,6 +1336,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean evt,
boolean metrics,
boolean keepBinary,
+ boolean oldValPresent,
+ @Nullable CacheObject oldVal,
AffinityTopologyVersion topVer,
CacheEntryPredicate[] filter,
GridDrType drType,
@@ -1403,7 +1408,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Map<UUID, CacheContinuousQueryListener> lsnrCol =
notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
- old = (retval || intercept || lsnrCol != null) ?
+ old = oldValPresent ? oldVal : (retval || intercept || lsnrCol != null) ?
rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
if (intercept) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c56d1f7..9d9862a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -542,6 +542,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
true,
true,
txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
topVer,
null,
replicate ? DR_BACKUP : DR_NONE,
@@ -561,6 +563,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
true,
true,
txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
topVer,
null,
replicate ? DR_BACKUP : DR_NONE,
@@ -592,6 +596,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
true,
true,
txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
topVer,
null,
replicate ? DR_BACKUP : DR_NONE,
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1bdd9b8..ec73bff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -360,7 +360,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
- if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
+ CacheObject val;
+ CacheObject oldVal = null;
+
+ boolean readOld = hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM;
+
+ if (readOld) {
cached.unswap(retVal);
boolean readThrough = !txEntry.skipStore() &&
@@ -375,7 +380,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
final boolean keepBinary = txEntry.keepBinary();
- CacheObject val = cached.innerGet(
+ val = oldVal = cached.innerGet(
null,
tx,
/*swap*/true,
@@ -470,6 +475,33 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
else
ret.success(txEntry.op() != DELETE || cached.hasValue());
}
+
+ // Send old value in case if rebalancing is not finished.
+ final boolean sndOldVal = !cacheCtx.isLocal() && !cacheCtx.topology().rebalanceFinished(tx.topologyVersion());
+
+ if (sndOldVal) {
+ if (oldVal == null && !readOld) {
+ oldVal = cached.innerGet(
+ null,
+ tx,
+ /*swap*/true,
+ /*readThrough*/false,
+ /*metrics*/false,
+ /*event*/false,
+ /*tmp*/false,
+ /*subjectId*/tx.subjectId(),
+ /*transformClo*/null,
+ /*taskName*/null,
+ /*expiryPlc*/null,
+ /*keepBinary*/true);
+ }
+
+ if (oldVal != null) {
+ oldVal.prepareMarshal(cacheCtx.cacheObjectContext());
+
+ txEntry.oldValue(oldVal, true);
+ }
+ }
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to get result value for cache entry: " + cached, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 87b2525..194208e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -115,6 +115,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
@GridDirectTransient
private TxEntryValueHolder prevVal = new TxEntryValueHolder();
+ /** Old value before update. */
+ @GridToStringInclude
+ private TxEntryValueHolder oldVal = new TxEntryValueHolder();
+
/** Transform. */
@GridToStringInclude
@GridDirectTransient
@@ -497,7 +501,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
- * @param oldValOnPrimary {@code True} If old value for 'invoke' operation was non null on primary node.
+ * @param oldValOnPrimary {@code True} If old value for was non null on primary node.
*/
public void oldValueOnPrimary(boolean oldValOnPrimary) {
setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
@@ -583,6 +587,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
+ * @return Old value.
+ */
+ @Nullable public CacheObject oldValue() {
+ return oldVal != null ? oldVal.value() : null;
+ }
+
+ /**
+ * @param oldVal Old value.
+ */
+ public void oldValue(CacheObject oldVal, boolean hasOldVal) {
+ if (this.oldVal == null)
+ this.oldVal = new TxEntryValueHolder();
+
+ this.oldVal.value(op(), oldVal, hasOldVal, hasOldVal);
+ }
+
+ /**
+ * @return {@code True} if old value present.
+ */
+ public boolean hasOldValue() {
+ return oldVal != null && oldVal.hasValue();
+ }
+
+ /**
* @return {@code True} if has value explicitly set.
*/
public boolean hasValue() {
@@ -1069,6 +1097,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
writer.incrementState();
+ case 13:
+ if (!writer.writeMessage("oldVal", oldVal))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -1186,6 +1219,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
+ case 13:
+ oldVal = reader.readMessage("oldVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1198,7 +1238,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ee992cc..637f322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -809,6 +809,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
evt,
metrics,
txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
topVer,
null,
cached.detached() ? DR_NONE : drType,
@@ -834,6 +836,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
false,
metrics,
txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
topVer,
CU.empty0(),
DR_NONE,
@@ -854,6 +858,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
evt,
metrics,
txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
topVer,
null,
cached.detached() ? DR_NONE : drType,
@@ -875,6 +881,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
false,
metrics,
txEntry.keepBinary(),
+ txEntry.hasOldValue(),
+ txEntry.oldValue(),
topVer,
CU.empty0(),
DR_NONE,
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 400fb14..bf543cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -477,6 +477,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
boolean evt,
boolean metrics,
boolean keepBinary,
+ boolean hasOldVal,
+ @Nullable CacheObject oldVal,
AffinityTopologyVersion topVer,
CacheEntryPredicate[] filter,
GridDrType drType,
@@ -556,6 +558,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
boolean evt,
boolean metrics,
boolean keepBinary,
+ boolean oldValPresent,
+ @Nullable CacheObject oldVal,
AffinityTopologyVersion topVer,
CacheEntryPredicate[] filter,
GridDrType drType,
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
index d19ecd7..17d88ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
@@ -58,6 +58,11 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite {
suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class);
suite.addTestSuite(CacheInterceptorPartitionCounterLocalSanityTest.class);
+ suite.addTestSuite(GridCacheInterceptorAtomicRebalanceTest.class);
+ suite.addTestSuite(GridCacheInterceptorTransactionalRebalanceTest.class);
+ suite.addTestSuite(GridCacheInterceptorAtomicOffheapRebalanceTest.class);
+ suite.addTestSuite(GridCacheInterceptorTransactionalOffheapRebalanceTest.class);
+
return suite;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 083367c..1376be1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -57,6 +57,8 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
@@ -65,6 +67,7 @@ import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -312,6 +315,102 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
}
/**
+ * Test that during rebalancing correct old value passed to continuous query.
+ *
+ * @throws Exception If fail.
+ */
+ public void testRebalance() throws Exception {
+ for (int iter = 0; iter < 5; iter++) {
+ log.info("Iteration: " + iter);
+
+ final IgniteEx ignite = startGrid(1);
+
+ final CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("testCache");
+
+ ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setCacheMode(cacheMode());
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ ccfg.setBackups(2);
+
+ final IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(ccfg);
+
+ final int KEYS = 10_000;
+
+ for (int i = 0; i < KEYS; i++)
+ cache.put(i, i);
+
+ final ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final AtomicBoolean err = new AtomicBoolean();
+
+ final AtomicInteger cntr = new AtomicInteger();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(
+ final Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> cacheEntryEvts) {
+ try {
+ for (final CacheEntryEvent<? extends Integer, ? extends Integer> evt : cacheEntryEvts) {
+ final Integer oldVal = evt.getOldValue();
+
+ final Integer val = evt.getValue();
+
+ assertNotNull("No old value: " + evt, oldVal);
+ assertEquals("Unexpected old value: " + evt, (Integer)(oldVal + 1), val);
+
+ cntr.incrementAndGet();
+ }
+ }
+ catch (Throwable e) {
+ err.set(true);
+
+ error("Unexpected error: " + e, e);
+ }
+ }
+ });
+
+ final QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final IgniteInternalFuture<Object> updFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ for (int i = 0; i < KEYS && !err.get(); i++)
+ cache.put(i, i + 1);
+
+ return null;
+ }
+ });
+
+ final IgniteInternalFuture<Object> rebFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ for (int i = 2; i <= 5 && !err.get(); i++)
+ startGrid(i);
+
+ return null;
+ }
+ });
+
+ latch.countDown();
+
+ updFut.get();
+ rebFut.get();
+
+ assertFalse("Unexpected error during test", err.get());
+
+ assertTrue(cntr.get() > 0);
+
+ cur.close();
+
+ stopAllGrids();
+ }
+ }
+
+ /**
* @param ignite Ignite.
* @param topVer Topology version.
* @throws Exception If failed.
[02/12] ignite git commit: IGNITE-3635 - Fixed StackOverflowError
thrown from BinaryObject.toString()
Posted by vo...@apache.org.
IGNITE-3635 - Fixed StackOverflowError thrown from BinaryObject.toString()
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c0b2b479
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c0b2b479
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c0b2b479
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c0b2b4797be4f250f6f1304ff27d45c72154608a
Parents: 98914fe
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Fri Sep 16 14:59:35 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Sep 16 14:59:35 2016 -0700
----------------------------------------------------------------------
.../internal/binary/BinaryObjectExImpl.java | 161 +++++++++++++------
.../binary/BinaryObjectToStringSelfTest.java | 75 +++++++++
.../IgniteBinaryObjectsTestSuite.java | 2 +
3 files changed, 190 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c0b2b479/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
index b4e909e..e6df407 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
@@ -20,14 +20,16 @@ package org.apache.ignite.internal.binary;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.binary.BinaryType;
-import org.apache.ignite.binary.BinaryObject;
import org.jetbrains.annotations.Nullable;
/**
@@ -164,6 +166,20 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
}
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ try {
+ BinaryReaderHandles ctx = new BinaryReaderHandles();
+
+ ctx.put(start(), this);
+
+ return toString(ctx, new IdentityHashMap<BinaryObject, Integer>());
+ }
+ catch (BinaryObjectException e) {
+ throw new IgniteException("Failed to create string representation of binary object.", e);
+ }
+ }
+
/**
* @param ctx Reader context.
* @param handles Handles for already traversed objects.
@@ -197,43 +213,7 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
buf.a(", ").a(name).a('=');
- if (val instanceof byte[])
- buf.a(Arrays.toString((byte[]) val));
- else if (val instanceof short[])
- buf.a(Arrays.toString((short[])val));
- else if (val instanceof int[])
- buf.a(Arrays.toString((int[])val));
- else if (val instanceof long[])
- buf.a(Arrays.toString((long[])val));
- else if (val instanceof float[])
- buf.a(Arrays.toString((float[])val));
- else if (val instanceof double[])
- buf.a(Arrays.toString((double[])val));
- else if (val instanceof char[])
- buf.a(Arrays.toString((char[])val));
- else if (val instanceof boolean[])
- buf.a(Arrays.toString((boolean[]) val));
- else if (val instanceof BigDecimal[])
- buf.a(Arrays.toString((BigDecimal[])val));
- else {
- if (val instanceof BinaryObjectExImpl) {
- BinaryObjectExImpl po = (BinaryObjectExImpl)val;
-
- Integer idHash0 = handles.get(val);
-
- if (idHash0 != null) { // Circular reference.
- BinaryType meta0 = po.rawType();
-
- assert meta0 != null;
-
- buf.a(meta0.typeName()).a(" [hash=").a(idHash0).a(", ...]");
- }
- else
- buf.a(po.toString(ctx, handles));
- }
- else
- buf.a(val);
- }
+ appendValue(val, buf, ctx, handles);
}
buf.a(']');
@@ -242,17 +222,102 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
return buf.toString();
}
- /** {@inheritDoc} */
- @Override public String toString() {
- try {
- BinaryReaderHandles ctx = new BinaryReaderHandles();
+ /**
+ * @param val Value to append.
+ * @param buf Buffer to append to.
+ * @param ctx Reader context.
+ * @param handles Handles for already traversed objects.
+ */
+ private void appendValue(Object val, SB buf, BinaryReaderHandles ctx,
+ IdentityHashMap<BinaryObject, Integer> handles) {
+ if (val instanceof byte[])
+ buf.a(Arrays.toString((byte[]) val));
+ else if (val instanceof short[])
+ buf.a(Arrays.toString((short[])val));
+ else if (val instanceof int[])
+ buf.a(Arrays.toString((int[])val));
+ else if (val instanceof long[])
+ buf.a(Arrays.toString((long[])val));
+ else if (val instanceof float[])
+ buf.a(Arrays.toString((float[])val));
+ else if (val instanceof double[])
+ buf.a(Arrays.toString((double[])val));
+ else if (val instanceof char[])
+ buf.a(Arrays.toString((char[])val));
+ else if (val instanceof boolean[])
+ buf.a(Arrays.toString((boolean[]) val));
+ else if (val instanceof BigDecimal[])
+ buf.a(Arrays.toString((BigDecimal[])val));
+ else if (val instanceof BinaryObjectExImpl) {
+ BinaryObjectExImpl po = (BinaryObjectExImpl)val;
+
+ Integer idHash0 = handles.get(val);
+
+ if (idHash0 != null) { // Circular reference.
+ BinaryType meta0 = po.rawType();
+
+ assert meta0 != null;
+
+ buf.a(meta0.typeName()).a(" [hash=").a(idHash0).a(", ...]");
+ }
+ else
+ buf.a(po.toString(ctx, handles));
+ }
+ else if (val instanceof Object[]) {
+ Object[] arr = (Object[])val;
- ctx.put(start(), this);
+ buf.a('[');
- return toString(ctx, new IdentityHashMap<BinaryObject, Integer>());
+ for (int i = 0; i < arr.length; i++) {
+ Object o = arr[i];
+
+ appendValue(o, buf, ctx, handles);
+
+ if (i < arr.length - 1)
+ buf.a(", ");
+ }
}
- catch (BinaryObjectException e) {
- throw new IgniteException("Failed to create string representation of binary object.", e);
+ else if (val instanceof Iterable) {
+ Iterable<Object> col = (Iterable<Object>)val;
+
+ buf.a(col.getClass().getSimpleName()).a(" {");
+
+ Iterator it = col.iterator();
+
+ while (it.hasNext()) {
+ Object o = it.next();
+
+ appendValue(o, buf, ctx, handles);
+
+ if (it.hasNext())
+ buf.a(", ");
+ }
+
+ buf.a('}');
+ }
+ else if (val instanceof Map) {
+ Map<Object, Object> map = (Map<Object, Object>)val;
+
+ buf.a(map.getClass().getSimpleName()).a(" {");
+
+ Iterator<Map.Entry<Object, Object>> it = map.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Object, Object> e = it.next();
+
+ appendValue(e.getKey(), buf, ctx, handles);
+
+ buf.a('=');
+
+ appendValue(e.getValue(), buf, ctx, handles);
+
+ if (it.hasNext())
+ buf.a(", ");
+ }
+
+ buf.a('}');
}
+ else
+ buf.a(val);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c0b2b479/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
new file mode 100644
index 0000000..cc6cf8b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
@@ -0,0 +1,75 @@
+package org.apache.ignite.internal.binary;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@code BinaryObject.toString()}.
+ */
+public class BinaryObjectToStringSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void testToString() throws Exception {
+ MyObject obj = new MyObject();
+
+ obj.arr = new Object[] {111, "aaa", obj};
+ obj.col = Arrays.asList(222, "bbb", obj);
+
+ obj.map = new HashMap();
+
+ obj.map.put(10, 333);
+ obj.map.put(20, "ccc");
+ obj.map.put(30, obj);
+
+ BinaryObject bo = grid().binary().toBinary(obj);
+
+ // Check that toString() doesn't fail with StackOverflowError or other exceptions.
+ bo.toString();
+ }
+
+ /**
+ */
+ private static class MyObject {
+ /** Object array. */
+ private Object[] arr;
+
+ /** Collection. */
+ private Collection col;
+
+ /** Map. */
+ private Map map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c0b2b479/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
index dc0540d..c1d9974 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryMarshallerSelfTest;
import org.apache.ignite.internal.binary.BinaryObjectBuilderAdditionalSelfTest;
import org.apache.ignite.internal.binary.BinaryObjectBuilderDefaultMappersSelfTest;
import org.apache.ignite.internal.binary.BinaryObjectBuilderSimpleNameLowerCaseMappersSelfTest;
+import org.apache.ignite.internal.binary.BinaryObjectToStringSelfTest;
import org.apache.ignite.internal.binary.BinarySimpleNameTestPropertySelfTest;
import org.apache.ignite.internal.binary.BinaryTreeSelfTest;
import org.apache.ignite.internal.binary.GridBinaryAffinityKeySelfTest;
@@ -102,6 +103,7 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite {
suite.addTestSuite(GridSimpleLowerCaseBinaryMappersBinaryMetaDataSelfTest.class);
suite.addTestSuite(GridBinaryAffinityKeySelfTest.class);
suite.addTestSuite(GridBinaryWildcardsSelfTest.class);
+ suite.addTestSuite(BinaryObjectToStringSelfTest.class);
// Tests for objects with non-compact footers.
suite.addTestSuite(BinaryMarshallerNonCompactSelfTest.class);
[10/12] ignite git commit: Added missing header to
BinaryObjectToStringSelfTest.
Posted by vo...@apache.org.
Added missing header to BinaryObjectToStringSelfTest.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/135f0a8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/135f0a8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/135f0a8a
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 135f0a8a39fb6895fada18d210260deebfb9426d
Parents: c1372ce
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 10:33:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 10:33:11 2016 +0300
----------------------------------------------------------------------
.../binary/BinaryObjectToStringSelfTest.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/135f0a8a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
index cc6cf8b..df6bcde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.binary;
import java.util.Arrays;
[08/12] ignite git commit: ignite-3810 Fixed hang in FileSwapSpaceSpi
when too large value is stored
Posted by vo...@apache.org.
ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780bf23d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780bf23d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780bf23d
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 780bf23d5c89452dd062be4fab9e2e56d50bb9e2
Parents: 9b72d18
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 19 18:19:33 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 19 18:19:33 2016 +0300
----------------------------------------------------------------------
.../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +++++++--
.../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++++++++
.../file/GridFileSwapSpaceSpiSelfTest.java | 89 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
4 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 8809f08..9be5b93 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -639,7 +639,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
if (space == null && create) {
validateName(name);
- Space old = spaces.putIfAbsent(masked, space = new Space(masked));
+ Space old = spaces.putIfAbsent(masked, space = new Space(masked, log));
if (old != null)
space = old;
@@ -833,13 +833,21 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
/** */
private final int maxSize;
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private boolean queueSizeWarn;
+
/**
* @param minTakeSize Min size.
* @param maxSize Max size.
+ * @param log logger
*/
- private SwapValuesQueue(int minTakeSize, int maxSize) {
+ private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger log) {
this.minTakeSize = minTakeSize;
this.maxSize = maxSize;
+ this.log = log;
}
/**
@@ -852,8 +860,24 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
lock.lock();
try {
- while (size + val.len > maxSize)
- mayAdd.await();
+ boolean largeVal = val.len > maxSize;
+
+ if (largeVal) {
+ if (!queueSizeWarn) {
+ U.warn(log, "Trying to save in swap entry which have size more than write queue size. " +
+ "You may wish to increase 'maxWriteQueueSize' in FileSwapSpaceSpi configuration " +
+ "[queueMaxSize=" + maxSize + ", valSize=" + val.len + ']');
+
+ queueSizeWarn = true;
+ }
+
+ while (size >= minTakeSize)
+ mayAdd.await();
+ }
+ else {
+ while (size + val.len > maxSize)
+ mayAdd.await();
+ }
size += val.len;
@@ -1419,7 +1443,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
private SwapFile right;
/** */
- private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, maxWriteQueSize);
+ private final SwapValuesQueue que;
/** Partitions. */
private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, SwapValue>> parts =
@@ -1442,11 +1466,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
/**
* @param name Space name.
+ * @param log Logger.
*/
- private Space(String name) {
+ private Space(String name, IgniteLogger log) {
assert name != null;
this.name = name;
+ this.que = new SwapValuesQueue(writeBufSize, maxWriteQueSize, log);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
new file mode 100644
index 0000000..8d189fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+
+/**
+ *
+ */
+public class CacheSwapUnswapGetTestSmallQueueSize extends CacheSwapUnswapGetTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((FileSwapSpaceSpi)cfg.getSwapSpaceSpi()).setMaxWriteQueueSize(2);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
index 64652b1..ab21165 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
@@ -25,11 +25,14 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -37,8 +40,10 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.swapspace.GridSwapSpaceSpiAbstractSelfTest;
import org.apache.ignite.spi.swapspace.SwapKey;
import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.junit.Assert;
/**
* Test for {@link FileSwapSpaceSpi}.
@@ -364,4 +369,88 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe
assertEquals(hash0, hash1);
}
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSaveValueLargeThenQueueSize() throws IgniteCheckedException {
+ final String spaceName = "mySpace";
+ final SwapKey key = new SwapKey("key");
+
+ final byte[] val = new byte[FileSwapSpaceSpi.DFLT_QUE_SIZE * 2];
+ Arrays.fill(val, (byte)1);
+
+ IgniteInternalFuture<byte[]> fut = GridTestUtils.runAsync(new Callable<byte[]>() {
+ @Override public byte[] call() throws Exception {
+ return saveAndGet(spaceName, key, val);
+ }
+ });
+
+ byte[] bytes = fut.get(10_000);
+
+ Assert.assertArrayEquals(val, bytes);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSaveValueLargeThenQueueSizeMultiThreaded() throws Exception {
+ final String spaceName = "mySpace";
+
+ final int threads = 5;
+
+ long DURATION = 30_000;
+
+ final int maxSize = FileSwapSpaceSpi.DFLT_QUE_SIZE * 2;
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ try {
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!done.get()) {
+ SwapKey key = new SwapKey(rnd.nextInt(1000));
+
+ spi.store(spaceName, key, new byte[rnd.nextInt(0, maxSize)], context());
+ }
+
+ return null;
+ }
+ }, threads, " async-put");
+
+ Thread.sleep(DURATION);
+
+ done.set(true);
+
+ fut.get();
+ }
+ finally {
+ done.set(true);
+ }
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @param key Key.
+ * @param val Value.
+ * @throws Exception If failed.
+ * @return Read bytes.
+ */
+ private byte[] saveAndGet(final String spaceName, final SwapKey key, byte[] val) throws Exception {
+ spi.store(spaceName, key, val, context());
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return spi.read(spaceName, key, context()) != null;
+ }
+ }, 10_000);
+
+ byte[] res = spi.read(spaceName, key, context());
+
+ assertNotNull(res);
+
+ return res;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 60d59d7..c494e73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTestSmallQueueSize;
import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
@@ -304,6 +305,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class);
suite.addTestSuite(CacheSwapUnswapGetTest.class);
+ suite.addTestSuite(CacheSwapUnswapGetTestSmallQueueSize.class);
suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class);
suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);
[03/12] ignite git commit: Merge remote-tracking branch
'community/ignite-1.6.8' into ignite-1.6.8
Posted by vo...@apache.org.
Merge remote-tracking branch 'community/ignite-1.6.8' into ignite-1.6.8
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c24cabaf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c24cabaf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c24cabaf
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c24cabafd69804b3ac8e2c08895c9b9b9597a7f3
Parents: c0b2b47 ebf354c
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Fri Sep 16 14:59:51 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Sep 16 14:59:51 2016 -0700
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 35 ++-
.../internal/GridEventConsumeHandler.java | 5 -
.../internal/GridMessageListenHandler.java | 5 -
.../cache/query/GridCacheQueryManager.java | 13 +-
.../continuous/CacheContinuousQueryHandler.java | 5 -
.../continuous/GridContinuousHandler.java | 8 -
.../continuous/GridContinuousProcessor.java | 33 +--
...eContinuousQueryMultiNodesFilteringTest.java | 161 ++++++++++++
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 17 +-
.../processors/hadoop/HadoopClassLoader.java | 6 +-
.../processors/hadoop/HadoopClasspathMain.java | 2 +-
.../processors/hadoop/HadoopClasspathUtils.java | 230 +++++++++++++---
.../processors/hadoop/HadoopDefaultJobInfo.java | 1 -
.../internal/processors/hadoop/HadoopUtils.java | 53 ++--
.../processors/hadoop/v2/HadoopV2Job.java | 32 +--
.../hadoop/v2/HadoopV2JobResourceManager.java | 5 +-
.../hadoop/v2/HadoopV2TaskContext.java | 15 +-
.../igfs/HadoopFIleSystemFactorySelfTest.java | 10 -
.../processors/hadoop/HadoopTestUtils.java | 73 +++++-
.../hadoop/HadoopUserLibsSelfTest.java | 260 +++++++++++++++++++
.../testsuites/IgniteHadoopTestSuite.java | 3 +
...gniteCacheReplicatedFieldsQuerySelfTest.java | 6 +-
.../IgniteCacheReplicatedQuerySelfTest.java | 4 +-
23 files changed, 811 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
[11/12] ignite git commit: IGNITE-3635: Additional fix for stack
overflow in binary objects.
Posted by vo...@apache.org.
IGNITE-3635: Additional fix for stack overflow in binary objects.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16b82b77
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16b82b77
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16b82b77
Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 16b82b77f00dff8e525c8cc68d3387de107c78d1
Parents: 135f0a8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 12:35:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 12:35:07 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/binary/BinaryObjectExImpl.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16b82b77/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
index e6df407..063bd83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
@@ -30,6 +30,7 @@ import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
/**
@@ -248,6 +249,8 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
buf.a(Arrays.toString((boolean[]) val));
else if (val instanceof BigDecimal[])
buf.a(Arrays.toString((BigDecimal[])val));
+ else if (val instanceof IgniteUuid)
+ buf.a(val);
else if (val instanceof BinaryObjectExImpl) {
BinaryObjectExImpl po = (BinaryObjectExImpl)val;