You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/23 10:41:03 UTC

[01/12] ignite git commit: IGNITE-3907 Fixed "Incorrect initialization CQ when node filter configured for cache"

Repository: ignite
Updated Branches:
  refs/heads/ignite-1.6.8-hadoop ee5a818ba -> 4d08b5cb5


IGNITE-3907 Fixed "Incorrect initialization CQ when node filter configured for cache"


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebf354c5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebf354c5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebf354c5

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: ebf354c568d0802b7eed1cc6b9d251941dbce014
Parents: 2474e2b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Sep 16 14:32:13 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Sep 16 14:32:13 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   5 -
 .../internal/GridMessageListenHandler.java      |   5 -
 .../continuous/CacheContinuousQueryHandler.java |   5 -
 .../continuous/GridContinuousHandler.java       |   8 -
 .../continuous/GridContinuousProcessor.java     |  33 ++--
 ...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++++++++++++
 6 files changed, 170 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index b4b1e58..ed6998d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -262,11 +262,6 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void unregister(UUID routineId, GridKernalContext ctx) {
         assert routineId != null;
         assert ctx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 2b8041d..1bca85c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -139,11 +139,6 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void unregister(UUID routineId, GridKernalContext ctx) {
         ctx.io().removeUserMessageListener(topic, pred);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7b3b47b..a5752ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -564,11 +564,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /** {@inheritDoc} */
-    @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void unregister(UUID routineId, GridKernalContext ctx) {
         assert routineId != null;
         assert ctx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index c90746d..f14b450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -57,14 +57,6 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
     public RegisterStatus register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException;
 
     /**
-     * Callback called after listener is registered and acknowledgement is sent.
-     *
-     * @param routineId Routine ID.
-     * @param ctx Kernal context.
-     */
-    public void onListenerRegistered(UUID routineId, GridKernalContext ctx);
-
-    /**
      * Unregisters listener.
      *
      * @param routineId Routine ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 5f61051..ad7ad4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -478,11 +478,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                     // Register handler only if local node passes projection predicate.
                     if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
-                        !locInfos.containsKey(item.routineId)) {
-                        if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
-                            item.autoUnsubscribe, false))
-                            item.hnd.onListenerRegistered(item.routineId, ctx);
-                    }
+                        !locInfos.containsKey(item.routineId))
+                        registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
+                            item.autoUnsubscribe, false);
 
                     if (!item.autoUnsubscribe)
                         // Register routine locally.
@@ -509,14 +507,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                 ctx.resource().injectGeneric(info.prjPred);
 
                             if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
-                                if (registerHandler(clientNodeId,
+                                registerHandler(clientNodeId,
                                     routineId,
                                     info.hnd,
                                     info.bufSize,
                                     info.interval,
                                     info.autoUnsubscribe,
-                                    false))
-                                    info.hnd.onListenerRegistered(routineId, ctx);
+                                    false);
                             }
                         }
                         catch (IgniteCheckedException err) {
@@ -555,9 +552,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx);
 
                 assert status != GridContinuousHandler.RegisterStatus.DELAYED;
-
-                if (status == GridContinuousHandler.RegisterStatus.REGISTERED)
-                    hnd.onListenerRegistered(routineId, this.ctx);
             }
         }
     }
@@ -649,8 +643,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             try {
                 registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
 
-                hnd.onListenerRegistered(routineId, ctx);
-
                 return new GridFinishedFuture<>(routineId);
             }
             catch (IgniteCheckedException e) {
@@ -700,9 +692,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         startFuts.put(routineId, fut);
 
         try {
-            if (locIncluded
-                && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
-                hnd.onListenerRegistered(routineId, ctx);
+            if (locIncluded || hnd.isQuery())
+                registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
 
             ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,
                 reqData.handler().keepBinary()));
@@ -1020,8 +1011,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 data.autoUnsubscribe()));
         }
 
-        boolean registered = false;
-
         if (err == null) {
             try {
                 IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate();
@@ -1030,10 +1019,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     ctx.resource().injectGeneric(prjPred);
 
                 if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
-                    !locInfos.containsKey(routineId)) {
-                    registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
+                    !locInfos.containsKey(routineId))
+                    registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
                         data.autoUnsubscribe(), false);
-                }
 
                 if (!data.autoUnsubscribe())
                     // Register routine locally.
@@ -1061,9 +1049,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         if (err != null)
             req.addError(ctx.localNodeId(), err);
-
-        if (registered)
-            hnd0.onListenerRegistered(routineId, ctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
index 7000446..cf0c0d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
@@ -17,9 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryCreatedListener;
@@ -33,9 +41,12 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -45,8 +56,10 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /** */
 @SuppressWarnings("unchecked")
@@ -57,13 +70,21 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
     /** */
     private static final int SERVER_GRIDS_COUNT = 6;
 
+    /** */
+    public static final int KEYS = 2_000;
+
     /** Cache entry operations' counts. */
     private static final ConcurrentMap<String, AtomicInteger> opCounts = new ConcurrentHashMap8<>();
 
+    /** Client. */
+    private static boolean client = false;
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
+        client = false;
+
         super.afterTest();
     }
 
@@ -122,6 +143,108 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithNodeFilter() throws Exception {
+        List<QueryCursor> qryCursors = new ArrayList<>();
+
+        final int nodesCnt = 3;
+
+        startGridsMultiThreaded(nodesCnt);
+
+        awaitPartitionMapExchange();
+
+        CacheConfiguration ccfg = cacheConfiguration(new NodeFilterByRegexp(".*(0|1)$"));
+
+        grid(0).createCache(ccfg);
+
+        final AtomicInteger cntr = new AtomicInteger();
+
+        final ConcurrentMap<ClusterNode, Set<Integer>> maps = new ConcurrentHashMap<>();
+
+        final AtomicBoolean doubleNtfFail = new AtomicBoolean(false);
+
+        CacheEntryUpdatedListener<Integer, Integer> lsnr = new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
+                throws CacheEntryListenerException {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    cntr.incrementAndGet();
+
+                    ClusterNode node = ((Ignite)e.getSource().unwrap(Ignite.class)).cluster().localNode();
+
+                    Set<Integer> set = maps.get(node);
+
+                    if (set == null) {
+                        set = new ConcurrentSkipListSet<>();
+
+                        Set<Integer> oldVal = maps.putIfAbsent(node, set);
+
+                        set = oldVal != null ? oldVal : set;
+                    }
+
+                    if (!set.add(e.getValue()))
+                        doubleNtfFail.set(false);
+                }
+            }
+        };
+
+        for (int i = 0; i < nodesCnt; i++) {
+            ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(lsnr);
+
+            Ignite ignite = grid(i);
+
+            log.info("Try to start CQ on node: " + ignite.cluster().localNode().id());
+
+            qryCursors.add(ignite.cache(ccfg.getName()).query(qry));
+
+            log.info("CQ started on node: " + ignite.cluster().localNode().id());
+        }
+
+        client = true;
+
+        startGrid(nodesCnt);
+
+        awaitPartitionMapExchange();
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        qryCursors.add(grid(nodesCnt).cache(ccfg.getName()).query(qry));
+
+        for (int i = 0; i <= nodesCnt; i++) {
+            for (int key = 0; key < KEYS; key++) {
+                int val = (i * KEYS) + key;
+
+                grid(i).cache(ccfg.getName()).put(val, val);
+            }
+        }
+
+        assertTrue(GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return cntr.get() >= 2 * (nodesCnt + 1) * KEYS;
+            }
+        }, 5000L));
+
+        assertFalse("Got duplicate", doubleNtfFail.get());
+
+        for (int i = 0; i < (nodesCnt + 1) * KEYS; i++) {
+            for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet())
+                assertTrue("Lost event on node: " + e.getKey().id() + ", event: " + i, e.getValue().remove(i));
+        }
+
+        for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet())
+            assertTrue("Unexpected event on node: " + e.getKey(), e.getValue().isEmpty());
+
+        assertEquals("Not expected count of CQ", nodesCnt + 1, qryCursors.size());
+
+        for (QueryCursor cur : qryCursors)
+            cur.close();
+    }
+
     /** */
     private Ignite startGrid(final int idx, boolean isClientMode) throws Exception {
         String gridName = getTestGridName(idx);
@@ -179,6 +302,28 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
         return node;
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param filter Node filter.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(NodeFilterByRegexp filter) {
+        return new CacheConfiguration("test-cache-cq")
+            .setBackups(1)
+            .setNodeFilter(filter)
+            .setAtomicityMode(ATOMIC)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setCacheMode(PARTITIONED);
+    }
+
     /** */
     private final static class ListenerConfiguration extends MutableCacheEntryListenerConfiguration {
         /** Operation. */
@@ -275,4 +420,20 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
             return ((Integer)clusterNode.attributes().get("idx") % 2) == idx % 2;
         }
     }
+
+    /** */
+    private final static class NodeFilterByRegexp implements IgnitePredicate<ClusterNode> {
+        /** */
+        private final Pattern pattern;
+
+        /** */
+        private NodeFilterByRegexp(String regExp) {
+            this.pattern = Pattern.compile(regExp);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode clusterNode) {
+            return pattern.matcher(clusterNode.id().toString()).matches();
+        }
+    }
 }


[06/12] ignite git commit: IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit

Posted by vo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e67e60f..a5b2202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
@@ -175,6 +177,12 @@ public class IgniteTxHandler {
             }
         });
 
+        ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
+                processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
+            }
+        });
+
         ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
@@ -882,7 +890,7 @@ public class IgniteTxHandler {
      * @param nodeId Sender node ID.
      * @param req Request.
      */
-    protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest req) {
+    protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
                 ", dhtTxId=" + req.version() +
@@ -918,14 +926,15 @@ public class IgniteTxHandler {
 
                 if (dhtTx != null) {
                     dhtTx.onePhaseCommit(true);
+                    dhtTx.needReturnValue(req.needReturnValue());
 
-                    finish(nodeId, dhtTx, req);
+                    finish(dhtTx, req);
                 }
 
                 if (nearTx != null) {
                     nearTx.onePhaseCommit(true);
 
-                    finish(nodeId, nearTx, req);
+                    finish(nearTx, req);
                 }
             }
         }
@@ -950,38 +959,60 @@ public class IgniteTxHandler {
                 req.deployInfo() != null);
         }
 
-        try {
-            // Reply back to sender.
-            ctx.io().send(nodeId, res, req.policy());
+        if (req.onePhaseCommit()) {
+            IgniteInternalFuture completeFut;
 
-            if (txPrepareMsgLog.isDebugEnabled()) {
-                txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
-                    ", dhtTxId=" + req.version() +
-                    ", node=" + nodeId + ']');
-            }
-        }
-        catch (IgniteCheckedException e) {
-            if (e instanceof ClusterTopologyCheckedException) {
-                if (txPrepareMsgLog.isDebugEnabled()) {
-                    txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
-                        ", dhtTxId=" + req.version() +
-                        ", node=" + nodeId + ']');
-                }
-            }
-            else {
-                U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
-                    "txId=" + req.nearXidVersion() +
-                    ", dhtTxId=" + req.version() +
-                    ", node=" + nodeId +
-                    ", err=" + e.getMessage() + ']');
+            IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
+                null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+            final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
+                null : nearTx.done() ? null : nearTx.finishFuture();
+
+            if (dhtFin != null && nearFin != null) {
+                GridCompoundFuture fut = new GridCompoundFuture();
+
+                fut.add(dhtFin);
+                fut.add(nearFin);
+
+                fut.markInitialized();
+
+                completeFut = fut;
             }
+            else
+                completeFut = dhtFin != null ? dhtFin : nearFin;
 
-            if (nearTx != null)
-                nearTx.rollback();
+            if (completeFut != null) {
+                final GridDhtTxPrepareResponse res0 = res;
+                final GridDhtTxRemote dhtTx0 = dhtTx;
+                final GridNearTxRemote nearTx0 = nearTx;
 
-            if (dhtTx != null)
-                dhtTx.rollback();
+                completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        sendReply(nodeId, req, res0, dhtTx0, nearTx0);
+                    }
+                });
+            }
+            else
+                sendReply(nodeId, req, res, dhtTx, nearTx);
         }
+        else
+            sendReply(nodeId, req, res, dhtTx, nearTx);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+        final GridDhtTxOnePhaseCommitAckRequest req) {
+        assert nodeId != null;
+        assert req != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Processing dht tx one phase commit ack request [nodeId=" + nodeId + ", req=" + req + ']');
+
+        for (GridCacheVersion ver : req.versions())
+            ctx.tm().removeTxReturn(ver);
     }
 
     /**
@@ -1139,12 +1170,10 @@ public class IgniteTxHandler {
     }
 
     /**
-     * @param nodeId Node ID.
      * @param tx Transaction.
      * @param req Request.
      */
     protected void finish(
-        UUID nodeId,
         GridDistributedTxRemoteAdapter tx,
         GridDhtTxPrepareRequest req) throws IgniteTxHeuristicCheckedException {
         assert tx != null : "No transaction for one-phase commit prepare request: " + req;
@@ -1177,6 +1206,52 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param nodeId Node id.
+     * @param req Request.
+     * @param res Response.
+     * @param dhtTx Dht tx.
+     * @param nearTx Near tx.
+     */
+    protected void sendReply(UUID nodeId,
+        GridDhtTxPrepareRequest req,
+        GridDhtTxPrepareResponse res,
+        GridDhtTxRemote dhtTx,
+        GridNearTxRemote nearTx) {
+        try {
+            // Reply back to sender.
+            ctx.io().send(nodeId, res, req.policy());
+
+            if (txPrepareMsgLog.isDebugEnabled()) {
+                txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
+                    ", dhtTxId=" + req.version() +
+                    ", node=" + nodeId + ']');
+            }
+        }
+        catch (IgniteCheckedException e) {
+            if (e instanceof ClusterTopologyCheckedException) {
+                if (txPrepareMsgLog.isDebugEnabled()) {
+                    txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
+                        ", dhtTxId=" + req.version() +
+                        ", node=" + nodeId + ']');
+                }
+            }
+            else {
+                U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
+                    "txId=" + req.nearXidVersion() +
+                    ", dhtTxId=" + req.version() +
+                    ", node=" + nodeId +
+                    ", err=" + e.getMessage() + ']');
+            }
+
+            if (nearTx != null)
+                nearTx.rollback();
+
+            if (dhtTx != null)
+                dhtTx.rollback();
+        }
+    }
+
+    /**
      * Sends tx finish response to remote node, if response is requested.
      *
      * @param nodeId Node id that originated finish request.
@@ -1191,7 +1266,26 @@ public class IgniteTxHandler {
             if (req.checkCommitted()) {
                 res.checkCommitted(true);
 
-                if (!committed) {
+                if (committed) {
+                    if (req.needReturnValue()) {
+                        try {
+                            GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
+
+                            if (wrapper != null)
+                                res.returnValue(wrapper.fut().get());
+                            else
+                                assert !ctx.discovery().alive(nodeId) : nodeId;
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (txFinishMsgLog.isDebugEnabled()) {
+                                txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId +
+                                    ", dhtTxId=" + req.version() +
+                                    ", node=" + nodeId + ']');
+                            }
+                        }
+                    }
+                }
+                else {
                     ClusterTopologyCheckedException cause =
                         new ClusterTopologyCheckedException("Primary node left grid.");
 
@@ -1492,8 +1586,7 @@ public class IgniteTxHandler {
      * @param req Request.
      */
     protected void processCheckPreparedTxRequest(final UUID nodeId,
-        final GridCacheTxRecoveryRequest req)
-    {
+        final GridCacheTxRecoveryRequest req) {
         if (txRecoveryMsgLog.isDebugEnabled()) {
             txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() +
                 ", node=" + nodeId + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 637f322..fe69536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -151,9 +151,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     /** Commit error. */
     protected volatile Throwable commitErr;
 
-    /** Need return value. */
-    protected boolean needRetVal;
-
     /** Implicit transaction result. */
     protected GridCacheReturn implicitRes;
 
@@ -355,13 +352,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * @return Flag indicating whether transaction needs return value.
-     */
-    public boolean needReturnValue() {
-        return needRetVal;
-    }
-
-    /**
      * @return {@code True} if transaction participates in a cache that has an interceptor configured.
      */
     public boolean hasInterceptor() {
@@ -369,13 +359,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * @param needRetVal Need return value flag.
-     */
-    public void needReturnValue(boolean needRetVal) {
-        this.needRetVal = needRetVal;
-    }
-
-    /**
      * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
      * to remote nodes.
      */
@@ -703,7 +686,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                         txEntry.cached().unswap(false);
 
                                     IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
-                                        true);
+                                        true, null);
 
                                     GridCacheVersion dhtVer = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index f9357f9..a1580a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +50,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLo
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -87,8 +92,11 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.ConcurrentLinkedHashMap;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
@@ -123,6 +131,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Tx salvage timeout (default 3s). */
     private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
 
+    /** One phase commit deferred ack request timeout. */
+    public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+        Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500);
+
+    /** One phase commit deferred ack request buffer size. */
+    private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+        Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
+
     /** Version in which deadlock detection introduced. */
     public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
 
@@ -160,7 +176,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
 
     /** Committed local transactions. */
-    private final ConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap =
+    private final ConcurrentLinkedHashMap<GridCacheVersion, Object> completedVersHashMap =
         new ConcurrentLinkedHashMap<>(
             Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
             0.75f,
@@ -168,6 +184,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
             PER_SEGMENT_Q);
 
+    /** Pending one phase commit ack requests sender. */
+    private GridDeferredAckMessageSender deferredAckMessageSender;
+
     /** Transaction finish synchronizer. */
     private GridCacheTxFinishSync txFinishSync;
 
@@ -209,6 +228,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     for (TxDeadlockFuture fut : deadlockDetectFuts.values())
                         fut.onNodeLeft(nodeId);
+
+                    for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
+                        Object obj = entry.getValue();
+
+                        if (obj instanceof GridCacheReturnCompletableWrapper &&
+                            nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
+                            removeTxReturn(entry.getKey());
+                    }
                 }
             },
             EVT_NODE_FAILED, EVT_NODE_LEFT);
@@ -237,6 +264,33 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         txFinishSync = new GridCacheTxFinishSync<>(cctx);
 
         txHnd = new IgniteTxHandler(cctx);
+
+        deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+            @Override public int getTimeout() {
+                return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
+            }
+
+            @Override public int getBufferSize() {
+                return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+            }
+
+            @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+                GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers);
+
+                cctx.kernalContext().gateway().readLock();
+
+                try {
+                    cctx.io().send(nodeId, ackReq, GridIoPolicy.SYSTEM_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to send one phase commit ack to backup node [backup=" +
+                        nodeId + ']', e);
+                }
+                finally {
+                    cctx.kernalContext().gateway().readUnlock();
+                }
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -898,9 +952,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      */
     public void addCommittedTx(IgniteInternalTx tx) {
         addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion());
+    }
 
-        if (!tx.local() && !tx.near() && tx.onePhaseCommit())
-            addCommittedTx(tx, tx.nearXidVersion(), null);
+    /**
+     * @param tx Committed transaction.
+     */
+    public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) {
+        addCommittedTxReturn(tx.nearXidVersion(), null, ret);
     }
 
     /**
@@ -925,7 +983,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (nearXidVer != null)
             xidVer = new CommittedVersion(xidVer, nearXidVer);
 
-        Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
+        Object committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
 
         if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
             Boolean b = completedVersSorted.putIfAbsent(xidVer, true);
@@ -933,7 +991,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             assert b == null;
         }
 
-        return committed0 == null || committed0;
+        Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+        return committed0 == null || committed;
+    }
+
+    /**
+     * @param xidVer Completed transaction version.
+     * @param nearXidVer Optional near transaction ID.
+     * @param retVal Invoke result.
+     */
+    private void addCommittedTxReturn(
+        GridCacheVersion xidVer,
+        @Nullable GridCacheVersion nearXidVer,
+        GridCacheReturnCompletableWrapper retVal
+    ) {
+        assert retVal != null;
+
+        if (nearXidVer != null)
+            xidVer = new CommittedVersion(xidVer, nearXidVer);
+
+        Object prev = completedVersHashMap.putIfAbsent(xidVer, retVal);
+
+        assert prev == null || Boolean.FALSE.equals(prev) : prev; // Can be rolled back.
     }
 
     /**
@@ -945,7 +1025,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         IgniteInternalTx tx,
         GridCacheVersion xidVer
     ) {
-        Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
+        Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
 
         if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
             Boolean b = completedVersSorted.putIfAbsent(xidVer, false);
@@ -953,7 +1033,47 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             assert b == null;
         }
 
-        return committed0 == null || !committed0;
+        Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+        return committed0 == null || !committed;
+    }
+
+    /**
+     * @param xidVer xidVer Completed transaction version.
+     * @return Tx result.
+     */
+    public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion xidVer) {
+        Object retVal = completedVersHashMap.get(xidVer);
+
+        // Will gain true in regular case or GridCacheReturn in onePhaseCommit case.
+        if (!Boolean.TRUE.equals(retVal)) {
+            assert !Boolean.FALSE.equals(retVal); // Method should be used only after 'committed' checked.
+
+            GridCacheReturnCompletableWrapper res = (GridCacheReturnCompletableWrapper)retVal;
+
+            removeTxReturn(xidVer);
+
+            return res;
+        }
+        else
+            return null;
+    }
+
+    /**
+     * @param xidVer xidVer Completed transaction version.
+     */
+    public void removeTxReturn(GridCacheVersion xidVer) {
+        Object prev = completedVersHashMap.get(xidVer);
+
+        if (Boolean.FALSE.equals(prev)) // Tx can be rolled back.
+            return;
+
+        assert prev instanceof GridCacheReturnCompletableWrapper:
+            prev + " instead of GridCacheReturnCompletableWrapper";
+
+        boolean res = completedVersHashMap.replace(xidVer, prev, true);
+
+        assert res;
     }
 
     /**
@@ -1086,7 +1206,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
          * so we don't do it here.
          */
 
-        Boolean committed = completedVersHashMap.get(tx.xidVersion());
+        Object committed0 = completedVersHashMap.get(tx.xidVersion());
+
+        Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
 
         // 1. Make sure that committed version has been recorded.
         if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
@@ -1672,12 +1794,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         boolean committed = false;
 
-        for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) {
+        for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
             if (entry.getKey() instanceof CommittedVersion) {
                 CommittedVersion comm = (CommittedVersion)entry.getKey();
 
                 if (comm.nearVer.equals(xidVer)) {
-                    committed = entry.getValue();
+                    committed = !entry.getValue().equals(Boolean.FALSE);
 
                     break;
                 }
@@ -1809,8 +1931,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         // Not all transactions were found. Need to scan committed versions to check
         // if transaction was already committed.
-        for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) {
-            if (!e.getValue())
+        for (Map.Entry<GridCacheVersion, Object> e : completedVersHashMap.entrySet()) {
+            if (e.getValue().equals(Boolean.FALSE))
                 continue;
 
             GridCacheVersion ver = e.getKey();
@@ -2137,6 +2259,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param nodeId Node ID to send message to.
+     * @param ver Version to ack.
+     */
+    public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
+        deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+    }
+
+    /**
      * @return Collection of active transaction deadlock detection futures.
      */
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index e611723..c3d194b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +31,6 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
-
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
@@ -43,8 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -59,6 +61,7 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
 import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
@@ -70,7 +73,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final long DURATION = 60_000;
+    protected static final long DURATION = 60_000;
 
     /** */
     protected static final int GRID_CNT = 4;
@@ -78,8 +81,8 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
     /**
      * @return Keys count for the test.
      */
-    private int keysCount() {
-        return 10_000;
+    protected int keysCount() {
+        return 2_000;
     }
 
     /**
@@ -249,12 +252,17 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
 
         IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
+                Random rnd = new Random();
+
                 while (!finished.get()) {
                     stopGrid(3);
 
                     U.sleep(300);
 
                     startGrid(3);
+
+                    if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+                        awaitPartitionMapExchange();
                 }
 
                 return null;
@@ -456,6 +464,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
 
             assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
         }
+
+        checkOnePhaseCommitReturnValuesCleaned();
+    }
+
+    /**
+     *
+     */
+    protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException {
+        U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            IgniteKernal ignite = (IgniteKernal)grid(i);
+
+            IgniteTxManager tm = ignite.context().cache().context().tm();
+
+            Map completedVersHashMap = U.field(tm, "completedVersHashMap");
+
+            for (Object o : completedVersHashMap.values()) {
+                assertTrue("completedVersHashMap contains" + o.getClass() + " instead of boolean. " +
+                    "These values should be replaced by boolean after onePhaseCommit finished. " +
+                    "[node=" + i + "]", o instanceof Boolean);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 9204bc8..9bfde27 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.HashSet;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
@@ -88,16 +89,6 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void testGetAndPut() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1525");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void testInvoke() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1525");
-    }
-
     /**
      * @throws Exception If failed.
      */
@@ -217,6 +208,70 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     }
 
     /**
+     *
+     */
+    public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception {
+        ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        final int keysCnt = keysCount();
+
+        IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Random rnd = new Random();
+
+                while (!finished.get()) {
+                    stopGrid(0);
+
+                    U.sleep(300);
+
+                    startGrid(0);
+
+                    if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+                        awaitPartitionMapExchange();
+                }
+
+                return null;
+            }
+        });
+
+        IgniteInternalFuture<Object> fut2 = runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+
+                while (!finished.get()) {
+                    try {
+                        IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+
+                        Integer val = ++iter;
+
+                        for (int i = 0; i < keysCnt; i++)
+                            cache.invoke(i, new SetEntryProcessor(val));
+                    }
+                    catch (Exception e) {
+                        // No-op.
+                    }
+                }
+
+                return null;
+            }
+        });
+
+        try {
+            U.sleep(DURATION);
+        }
+        finally {
+            finished.set(true);
+
+            fut.get();
+            fut2.get();
+        }
+
+        checkOnePhaseCommitReturnValuesCleaned();
+    }
+
+    /**
      * Callable to process inside transaction.
      */
     private static class ProcessCallable implements Callable<Void> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-client-mode.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-client-mode.properties b/modules/yardstick/config/benchmark-client-mode.properties
index ba5525f..f7c8347 100644
--- a/modules/yardstick/config/benchmark-client-mode.properties
+++ b/modules/yardstick/config/benchmark-client-mode.properties
@@ -70,6 +70,8 @@ CONFIGS="\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx-win.properties b/modules/yardstick/config/benchmark-tx-win.properties
index 73b857d..54a40b1 100644
--- a/modules/yardstick/config/benchmark-tx-win.properties
+++ b/modules/yardstick/config/benchmark-tx-win.properties
@@ -54,6 +54,8 @@ set DRIVER_HOSTS=localhost
 :: Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
 set CONFIGS=^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx.properties b/modules/yardstick/config/benchmark-tx.properties
index f3dbc24..0d5bb02 100644
--- a/modules/yardstick/config/benchmark-tx.properties
+++ b/modules/yardstick/config/benchmark-tx.properties
@@ -59,6 +59,8 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS}
 # Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
 CONFIGS="\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,\

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-win.properties b/modules/yardstick/config/benchmark-win.properties
index b6ecd67..b75b5d6 100644
--- a/modules/yardstick/config/benchmark-win.properties
+++ b/modules/yardstick/config/benchmark-win.properties
@@ -59,6 +59,8 @@ set CONFIGS=^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds atomic-put-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds atomic-put-get-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds sql-query-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds sql-query-join-1-backup,^

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark.properties b/modules/yardstick/config/benchmark.properties
index 67ef5ef..cfc1499 100644
--- a/modules/yardstick/config/benchmark.properties
+++ b/modules/yardstick/config/benchmark.properties
@@ -71,6 +71,8 @@ CONFIGS="\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
new file mode 100644
index 0000000..40e563c
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        int key = nextRandom(args.range());
+
+        cache.getAndPut(key, new SampleValue(key));
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Object> cache() {
+        return ignite().cache("atomic");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
new file mode 100644
index 0000000..49ae985
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+    /** */
+    private IgniteTransactions txs;
+
+    /** */
+    private Callable<Void> clo;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+            ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+        txs = ignite().transactions();
+
+        clo = new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                int key = nextRandom(args.range());
+
+                cache.getAndPut(key, new SampleValue(key));
+
+                return null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Object> cache() {
+        return ignite().cache("tx");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
index 8f05598..64dc6b8 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
@@ -17,12 +17,52 @@
 
 package org.apache.ignite.yardstick.cache;
 
+import java.util.Map;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
 
 /**
  * Ignite benchmark that performs invoke operations.
  */
 public class IgniteInvokeTxBenchmark extends IgniteInvokeBenchmark {
+    /** */
+    private IgniteTransactions txs;
+
+    /** */
+    private Callable<Void> clo;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+            ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+        txs = ignite().transactions();
+
+        clo = new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                int key = nextRandom(args.range());
+
+                cache.invoke(key, new SetValueEntryProcessor(new SampleValue(key)));
+
+                return null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteCache<Integer, Object> cache() {
         return ignite().cache("tx");


[07/12] ignite git commit: IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit

Posted by vo...@apache.org.
IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b72d18d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b72d18d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b72d18d

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 9b72d18dd94ec1383653f00474c102804c02790a
Parents: c3eff6b
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Sep 19 18:07:20 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Sep 19 18:07:20 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  12 +
 .../communication/GridIoMessageFactory.java     |   6 +
 .../GridCacheReturnCompletableWrapper.java      | 101 +++++++++
 .../cache/GridDeferredAckMessageSender.java     | 219 ++++++++++++++++++
 .../GridDistributedTxRemoteAdapter.java         |  59 +++--
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  33 ++-
 .../dht/GridDhtTxFinishResponse.java            |  52 ++++-
 .../dht/GridDhtTxOnePhaseCommitAckRequest.java  | 134 +++++++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java |   6 +-
 .../dht/GridDhtTxPrepareRequest.java            |  93 +++++---
 .../cache/distributed/dht/GridDhtTxRemote.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 227 +++++--------------
 ...arOptimisticSerializableTxPrepareFuture.java |   4 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   7 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   4 +-
 .../near/GridNearTxFinishFuture.java            | 112 +++++++--
 .../cache/transactions/IgniteTxAdapter.java     |  46 +++-
 .../cache/transactions/IgniteTxHandler.java     | 163 ++++++++++---
 .../transactions/IgniteTxLocalAdapter.java      |  19 +-
 .../cache/transactions/IgniteTxManager.java     | 154 ++++++++++++-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  39 +++-
 ...gniteCachePutRetryTransactionalSelfTest.java |  75 +++++-
 .../config/benchmark-client-mode.properties     |   2 +
 .../config/benchmark-tx-win.properties          |   2 +
 .../yardstick/config/benchmark-tx.properties    |   2 +
 .../yardstick/config/benchmark-win.properties   |   2 +
 modules/yardstick/config/benchmark.properties   |   2 +
 .../cache/IgniteGetAndPutBenchmark.java         |  41 ++++
 .../cache/IgniteGetAndPutTxBenchmark.java       |  70 ++++++
 .../cache/IgniteInvokeTxBenchmark.java          |  40 ++++
 31 files changed, 1405 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 7c428a6..ab6403f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -290,6 +290,18 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT = "IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT";
 
     /**
+     * One phase commit deferred ack request timeout.
+     */
+    public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+        "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT";
+
+    /**
+     * One phase commit deferred ack request buffer size.
+     */
+    public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+        "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE";
+
+    /**
      * If this property set then debug console will be opened for H2 indexing SPI.
      */
     public static final String IGNITE_H2_DEBUG_CONSOLE = "IGNITE_H2_DEBUG_CONSOLE";

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5f60215..8b8a734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
@@ -160,6 +161,11 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -27:
+                msg = new GridDhtTxOnePhaseCommitAckRequest();
+
+                break;
+
             case -26:
                 msg = new TxLockList();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
new file mode 100644
index 0000000..8ceaf71
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides initialized GridCacheReturn.
+ */
+public class GridCacheReturnCompletableWrapper {
+    /** Completable wrapper upd. */
+    private static final AtomicReferenceFieldUpdater<GridCacheReturnCompletableWrapper, Object> COMPLETABLE_WRAPPER_UPD =
+        AtomicReferenceFieldUpdater.newUpdater(GridCacheReturnCompletableWrapper.class, Object.class, "o");
+
+    /** */
+    private volatile Object o;
+
+    /** Node id. */
+    private final UUID nodeId;
+
+    /**
+     * @param nodeId Node id.
+     */
+    public GridCacheReturnCompletableWrapper(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return ID of node initiated tx or {@code null} if this node is local.
+     */
+    @Nullable public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Marks as initialized.
+     *
+     * @param ret Return.
+     */
+    public void initialize(GridCacheReturn ret) {
+        final Object obj = this.o;
+
+        if (obj == null) {
+            boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, ret);
+
+            if (!res)
+                initialize(ret);
+        }
+        else if (obj instanceof GridFutureAdapter) {
+            ((GridFutureAdapter)obj).onDone(ret);
+
+            boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, obj, ret);
+
+            assert res;
+        }
+        else
+            throw new IllegalStateException("GridCacheReturnCompletableWrapper can't be reinitialized");
+    }
+
+    /**
+     * Allows wait for properly initialized value.
+     */
+    public IgniteInternalFuture<GridCacheReturn> fut() {
+        final Object obj = this.o;
+
+        if (obj instanceof GridCacheReturn)
+            return new GridFinishedFuture<>((GridCacheReturn)obj);
+        else if (obj instanceof IgniteInternalFuture)
+            return (IgniteInternalFuture)obj;
+        else if (obj == null) {
+            boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, new GridFutureAdapter<>());
+
+            if (res)
+                return (IgniteInternalFuture)this.o;
+            else
+                return fut();
+        }
+        else
+            throw new IllegalStateException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
new file mode 100644
index 0000000..7145dc2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ *
+ */
+public abstract class GridDeferredAckMessageSender {
+    /** Deferred message buffers. */
+    private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
+
+    /** Timeout processor. */
+    private GridTimeoutProcessor time;
+
+    /** Closure processor. */
+    public GridClosureProcessor closure;
+
+    /**
+     * @param time Time.
+     * @param closure Closure.
+     */
+    public GridDeferredAckMessageSender(GridTimeoutProcessor time,
+        GridClosureProcessor closure) {
+        this.time = time;
+        this.closure = closure;
+    }
+
+    /**
+     *
+     */
+    public abstract int getTimeout();
+
+    /**
+     *
+     */
+    public abstract int getBufferSize();
+
+    /**
+     *
+     */
+    public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+
+    /**
+     *
+     */
+    public void stop() {
+        for (DeferredAckMessageBuffer buf : deferredAckMsgBuffers.values())
+            buf.finish0();
+    }
+
+    /**
+     * @param nodeId Node ID to send message to.
+     * @param ver Version to ack.
+     */
+    public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+        while (true) {
+            DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
+
+            if (buf == null) {
+                buf = new DeferredAckMessageBuffer(nodeId);
+
+                DeferredAckMessageBuffer old = deferredAckMsgBuffers.putIfAbsent(nodeId, buf);
+
+                if (old == null) {
+                    // We have successfully added buffer to map.
+                    time.addTimeoutObject(buf);
+                }
+                else
+                    buf = old;
+            }
+
+            if (!buf.add(ver))
+                // Some thread is sending filled up buffer, we can remove it.
+                deferredAckMsgBuffers.remove(nodeId, buf);
+            else
+                break;
+        }
+    }
+
+    /**
+     * Deferred message buffer.
+     */
+    private class DeferredAckMessageBuffer extends ReentrantReadWriteLock implements GridTimeoutObject {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Filled atomic flag. */
+        private AtomicBoolean guard = new AtomicBoolean(false);
+
+        /** Versions. */
+        private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /** Timeout ID. */
+        private final IgniteUuid timeoutId;
+
+        /** End time. */
+        private final long endTime;
+
+        /**
+         * @param nodeId Node ID to send message to.
+         */
+        private DeferredAckMessageBuffer(UUID nodeId) {
+            this.nodeId = nodeId;
+
+            timeoutId = IgniteUuid.fromUuid(nodeId);
+
+            endTime = U.currentTimeMillis() + getTimeout();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return timeoutId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            if (guard.compareAndSet(false, true)) {
+                closure.runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        writeLock().lock();
+
+                        try {
+                            finish0();
+                        }
+                        finally {
+                            writeLock().unlock();
+                        }
+                    }
+                });
+            }
+        }
+
+        /**
+         * Adds deferred request to buffer.
+         *
+         * @param ver Version to send.
+         * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
+         */
+        public boolean add(GridCacheVersion ver) {
+            readLock().lock();
+
+            boolean snd = false;
+
+            try {
+                if (guard.get())
+                    return false;
+
+                vers.add(ver);
+
+                if (vers.sizex() > getBufferSize() && guard.compareAndSet(false, true))
+                    snd = true;
+            }
+            finally {
+                readLock().unlock();
+            }
+
+            if (snd) {
+                // Wait all threads in read lock to finish.
+                writeLock().lock();
+
+                try {
+                    finish0();
+
+                    time.removeTimeoutObject(this);
+                }
+                finally {
+                    writeLock().unlock();
+                }
+            }
+
+            return true;
+        }
+
+        /**
+         * Sends deferred notification message and removes this buffer from pending responses map.
+         */
+        private void finish0() {
+            finish(nodeId, vers);
+
+            deferredAckMsgBuffers.remove(nodeId, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 9d9862a..4adfa8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -448,7 +450,25 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                 Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap();
 
+                GridCacheReturnCompletableWrapper wrapper = null;
+
                 if (!F.isEmpty(writeMap)) {
+                    GridCacheReturn ret = null;
+
+                    if (!near() && !local() && onePhaseCommit()) {
+                        if (needReturnValue()) {
+                            ret = new GridCacheReturn(null, cctx.localNodeId().equals(otherNodeId()), true, null, true);
+
+                            UUID origNodeId = otherNodeId(); // Originating node.
+
+                            cctx.tm().addCommittedTxReturn(this,
+                                wrapper = new GridCacheReturnCompletableWrapper(
+                                    !cctx.localNodeId().equals(origNodeId) ? origNodeId : null));
+                        }
+                        else
+                            cctx.tm().addCommittedTx(this, this.nearXidVersion(), null);
+                    }
+
                     // Register this transaction as completed prior to write-phase to
                     // ensure proper lock ordering for removed entries.
                     cctx.tm().addCommittedTx(this);
@@ -457,13 +477,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                     batchStoreCommit(writeMap().values());
 
-                    // Node that for near transactions we grab all entries.
-                    for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
-                        GridCacheContext cacheCtx = txEntry.context();
+                    try {
+                        // Node that for near transactions we grab all entries.
+                        for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
+                            GridCacheContext cacheCtx = txEntry.context();
 
-                        boolean replicate = cacheCtx.isDrEnabled();
+                            boolean replicate = cacheCtx.isDrEnabled();
 
-                        try {
                             while (true) {
                                 try {
                                     GridCacheEntryEx cached = txEntry.cached();
@@ -486,7 +506,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         txEntry.cached().unswap(false);
 
                                     IgniteBiTuple<GridCacheOperation, CacheObject> res =
-                                        applyTransformClosures(txEntry, false);
+                                        applyTransformClosures(txEntry, false, ret);
 
                                     GridCacheOperation op = res.get1();
                                     CacheObject val = res.get2();
@@ -672,21 +692,26 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                 }
                             }
                         }
-                        catch (Throwable ex) {
-                            // In case of error, we still make the best effort to commit,
-                            // as there is no way to rollback at this point.
-                            err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
-                                "(all transaction entries will be invalidated): " + CU.txString(this), ex);
+                    }
+                    catch (Throwable ex) {
+                        // In case of error, we still make the best effort to commit,
+                        // as there is no way to rollback at this point.
+                        err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
+                            "(all transaction entries will be invalidated): " + CU.txString(this), ex);
 
-                            U.error(log, "Commit failed.", err);
+                        U.error(log, "Commit failed.", err);
 
-                            uncommit();
+                        uncommit();
 
-                            state(UNKNOWN);
+                        state(UNKNOWN);
 
-                            if (ex instanceof Error)
-                                throw (Error)ex;
-                        }
+                        if (ex instanceof Error)
+                            throw (Error)ex;
+
+                    }
+                    finally {
+                        if (wrapper != null)
+                            wrapper.initialize(ret);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d2e26b4..ac2ab41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -351,7 +351,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.size(),
                 tx.subjectId(),
                 tx.taskNameHash(),
-                tx.activeCachesDeploymentEnabled());
+                tx.activeCachesDeploymentEnabled(),
+                false,
+                false);
 
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
@@ -448,7 +450,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.subjectId(),
                 tx.taskNameHash(),
                 tx.activeCachesDeploymentEnabled(),
-                updCntrs);
+                updCntrs,
+                false,
+                false);
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
 
@@ -516,7 +520,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                     tx.size(),
                     tx.subjectId(),
                     tx.taskNameHash(),
-                    tx.activeCachesDeploymentEnabled());
+                    tx.activeCachesDeploymentEnabled(),
+                    false,
+                    false);
 
                 req.writeVersion(tx.writeVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 2d98e0d..c618a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -46,6 +46,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** */
     public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
 
+    /** */
+    public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
+
     /** Near node ID. */
     private UUID nearNodeId;
 
@@ -141,7 +144,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         int txSize,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean addDepInfo
+        boolean addDepInfo,
+        boolean retVal,
+        boolean waitRemoteTxs
     ) {
         super(
             xidVer,
@@ -172,6 +177,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         this.sysInvalidate = sysInvalidate;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+
+        needReturnValue(retVal);
+        waitRemoteTransactions(waitRemoteTxs);
     }
 
     /**
@@ -224,11 +232,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         @Nullable UUID subjId,
         int taskNameHash,
         boolean addDepInfo,
-        Collection<Long> updateIdxs
+        Collection<Long> updateIdxs,
+        boolean retVal,
+        boolean waitRemoteTxs
     ) {
         this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
             sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
-            subjId, taskNameHash, addDepInfo);
+            subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
 
         if (updateIdxs != null && !updateIdxs.isEmpty()) {
             partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -339,6 +349,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
             flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
     }
 
+    /**
+     * @return Flag indicating whether transaction needs return value.
+     */
+    public boolean needReturnValue() {
+        return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param retVal Need return value.
+     */
+    public void needReturnValue(boolean retVal) {
+        if (retVal)
+            flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+        else
+            flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtTxFinishRequest.class, this, super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 78dc16f..0618172 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -19,9 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -51,6 +52,9 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     /** Flag indicating if this is a check-committed response. */
     private boolean checkCommitted;
 
+    /** Cache return value. */
+    private GridCacheReturn retVal;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -112,6 +116,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
         if (checkCommittedErr != null && checkCommittedErrBytes == null)
             checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr);
+
+        if (retVal != null && retVal.cacheId() != 0) {
+            GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+            assert cctx != null : retVal.cacheId();
+
+            retVal.prepareMarshal(cctx);
+        }
     }
 
     /** {@inheritDoc} */
@@ -121,6 +133,28 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
         if (checkCommittedErrBytes != null && checkCommittedErr == null)
             checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+        if (retVal != null && retVal.cacheId() != 0) {
+            GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+            assert cctx != null : retVal.cacheId();
+
+            retVal.finishUnmarshal(cctx, ldr);
+        }
+    }
+
+    /**
+     * @param retVal Return value.
+     */
+    public void returnValue(GridCacheReturn retVal) {
+        this.retVal = retVal;
+    }
+
+    /**
+     * @return Return value.
+     */
+    public GridCacheReturn returnValue() {
+        return retVal;
     }
 
     /** {@inheritDoc} */
@@ -161,6 +195,12 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 writer.incrementState();
 
+            case 8:
+                if (!writer.writeMessage("retVal", retVal))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -201,6 +241,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
+            case 8:
+                retVal = reader.readMessage("retVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtTxFinishResponse.class);
@@ -213,6 +261,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
new file mode 100644
index 0000000..0c8ae69
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * One Phase Commit Near transaction ack request.
+ */
+public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Lock or transaction versions. */
+    @GridToStringInclude
+    @GridDirectCollection(GridCacheVersion.class)
+    protected Collection<GridCacheVersion> vers;
+
+    /**
+     * Default constructor.
+     */
+    public GridDhtTxOnePhaseCommitAckRequest() {
+        // No-op.
+    }
+
+    /**
+     *
+     * @param vers Near Tx xid Versions.
+     */
+    public GridDhtTxOnePhaseCommitAckRequest(Collection<GridCacheVersion> vers) {
+        this.vers = vers;
+    }
+
+    /**
+     * @return Version.
+     */
+    public Collection<GridCacheVersion> versions() {
+        return vers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxOnePhaseCommitAckRequest.class, this, super.toString());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                vers = reader.readCollection("vers", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtTxOnePhaseCommitAckRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -27;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ec73bff..1dbda69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1245,7 +1245,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         tx.onePhaseCommit(),
                         tx.subjectId(),
                         tx.taskNameHash(),
-                        tx.activeCachesDeploymentEnabled());
+                        tx.activeCachesDeploymentEnabled(),
+                        retVal);
 
                     int idx = 0;
 
@@ -1356,7 +1357,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                             tx.onePhaseCommit(),
                             tx.subjectId(),
                             tx.taskNameHash(),
-                            tx.activeCachesDeploymentEnabled());
+                            tx.activeCachesDeploymentEnabled(),
+                            retVal);
 
                         for (IgniteTxEntry entry : nearMapping.entries()) {
                             if (CU.writes().apply(entry)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 1cdc96f..a8f2087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -52,6 +52,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
+
     /** Max order. */
     private UUID nearNodeId;
 
@@ -100,6 +103,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** Preload keys. */
     private BitSet preloadKeys;
 
+    /** */
+    private byte flags;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -118,6 +124,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
      * @param txNodes Transaction nodes mapping.
      * @param nearXidVer Near transaction ID.
      * @param last {@code True} if this is last prepare request for node.
+     * @param retVal Need return value flag.
      * @param addDepInfo Deployment info flag.
      */
     public GridDhtTxPrepareRequest(
@@ -134,7 +141,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         boolean onePhaseCommit,
         UUID subjId,
         int taskNameHash,
-        boolean addDepInfo) {
+        boolean addDepInfo,
+        boolean retVal) {
         super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo);
 
         assert futId != null;
@@ -149,12 +157,31 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
 
+        needReturnValue(retVal);
+
         invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size());
 
         nearNodeId = tx.nearNodeId();
     }
 
     /**
+     * @return Flag indicating whether transaction needs return value.
+     */
+    public boolean needReturnValue() {
+        return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param retVal Need return value.
+     */
+    public void needReturnValue(boolean retVal) {
+        if (retVal)
+            flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+        else
+            flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+    }
+
+    /**
      * @return {@code True} if this is last prepare request for node.
      */
     public boolean last() {
@@ -348,78 +375,84 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
         switch (writer.state()) {
             case 23:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("last", last))
+                if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeBitSet("preloadKeys", preloadKeys))
+                if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 33:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
             case 34:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 35:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 36:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -442,7 +475,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
         switch (reader.state()) {
             case 23:
-                futId = reader.readIgniteUuid("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -450,7 +483,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 24:
-                invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -458,7 +491,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 25:
-                last = reader.readBoolean("last");
+                invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
 
                 if (!reader.isLastRead())
                     return false;
@@ -466,7 +499,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 26:
-                miniId = reader.readIgniteUuid("miniId");
+                last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
                     return false;
@@ -474,7 +507,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 27:
-                nearNodeId = reader.readUuid("nearNodeId");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -482,7 +515,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 28:
-                nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -490,7 +523,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 29:
-                nearXidVer = reader.readMessage("nearXidVer");
+                nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -498,7 +531,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 30:
-                ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
+                nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -506,7 +539,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 31:
-                ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
+                ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -514,7 +547,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 32:
-                preloadKeys = reader.readBitSet("preloadKeys");
+                ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -522,7 +555,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 33:
-                subjId = reader.readUuid("subjId");
+                preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
                     return false;
@@ -530,7 +563,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 34:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -538,6 +571,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 35:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 36:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -557,6 +598,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 36;
+        return 37;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index dc27eb1..6ad20c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -189,9 +189,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
             commitVer,
             sys,
             plc,
-            concurrency, 
-            isolation, 
-            invalidate, 
+            concurrency,
+            isolation,
+            invalidate,
             timeout,
             txSize,
             subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1e45fa7..30a3d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -29,9 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
@@ -60,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
+import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -82,7 +80,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -102,11 +99,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
@@ -144,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
 
     /** Pending */
-    private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
+    private GridDeferredAckMessageSender deferredUpdateMessageSender;
 
     /** */
     private GridNearAtomicCache<K, V> near;
@@ -240,6 +235,53 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
+        deferredUpdateMessageSender = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
+            @Override public int getTimeout() {
+                return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+            }
+
+            @Override public int getBufferSize() {
+                return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
+            }
+
+            @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+                GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+                    vers, ctx.deploymentEnabled());
+
+                try {
+                    ctx.kernalContext().gateway().readLock();
+
+                    try {
+                        ctx.io().send(nodeId, msg, ctx.ioPolicy());
+
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
+                                ", node=" + nodeId + ']');
+                        }
+                    }
+                    finally {
+                        ctx.kernalContext().gateway().readUnlock();
+                    }
+                }
+                catch (IllegalStateException ignored) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
+                            "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+                    }
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("Failed to send deferred DHT update response, node left [" +
+                            "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send deferred DHT update response to remote node [" +
+                        "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
+                }
+            }
+        };
+
         CacheMetricsImpl m = new CacheMetricsImpl(ctx);
 
         if (ctx.dht().near() != null)
@@ -405,8 +447,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public void stop() {
-        for (DeferredResponseBuffer buf : pendingResponses.values())
-            buf.finish();
+        deferredUpdateMessageSender.stop();
     }
 
     /**
@@ -3208,28 +3249,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param ver Version to ack.
      */
     private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
-        while (true) {
-            DeferredResponseBuffer buf = pendingResponses.get(nodeId);
-
-            if (buf == null) {
-                buf = new DeferredResponseBuffer(nodeId);
-
-                DeferredResponseBuffer old = pendingResponses.putIfAbsent(nodeId, buf);
-
-                if (old == null) {
-                    // We have successfully added buffer to map.
-                    ctx.time().addTimeoutObject(buf);
-                }
-                else
-                    buf = old;
-            }
-
-            if (!buf.addResponse(ver))
-                // Some thread is sending filled up buffer, we can remove it.
-                pendingResponses.remove(nodeId, buf);
-            else
-                break;
-        }
+        deferredUpdateMessageSender.sendDeferredAckMessage(nodeId, ver);
     }
 
     /**
@@ -3452,149 +3472,4 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             return Collections.emptyList();
         }
     }
-
-    /**
-     * Deferred response buffer.
-     */
-    private class DeferredResponseBuffer extends ReentrantReadWriteLock implements GridTimeoutObject {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Filled atomic flag. */
-        private AtomicBoolean guard = new AtomicBoolean(false);
-
-        /** Response versions. */
-        private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
-
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Timeout ID. */
-        private final IgniteUuid timeoutId;
-
-        /** End time. */
-        private final long endTime;
-
-        /**
-         * @param nodeId Node ID to send message to.
-         */
-        private DeferredResponseBuffer(UUID nodeId) {
-            this.nodeId = nodeId;
-
-            timeoutId = IgniteUuid.fromUuid(nodeId);
-
-            endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteUuid timeoutId() {
-            return timeoutId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long endTime() {
-            return endTime;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            if (guard.compareAndSet(false, true)) {
-                ctx.closures().runLocalSafe(new Runnable() {
-                    @Override public void run() {
-                        writeLock().lock();
-
-                        try {
-                            finish();
-                        }
-                        finally {
-                            writeLock().unlock();
-                        }
-                    }
-                });
-            }
-        }
-
-        /**
-         * Adds deferred response to buffer.
-         *
-         * @param ver Version to send.
-         * @return {@code True} if response was handled, {@code false} if this buffer is filled and cannot be used.
-         */
-        public boolean addResponse(GridCacheVersion ver) {
-            readLock().lock();
-
-            boolean snd = false;
-
-            try {
-                if (guard.get())
-                    return false;
-
-                respVers.add(ver);
-
-                if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
-                    snd = true;
-            }
-            finally {
-                readLock().unlock();
-            }
-
-            if (snd) {
-                // Wait all threads in read lock to finish.
-                writeLock().lock();
-
-                try {
-                    finish();
-
-                    ctx.time().removeTimeoutObject(this);
-                }
-                finally {
-                    writeLock().unlock();
-                }
-            }
-
-            return true;
-        }
-
-        /**
-         * Sends deferred notification message and removes this buffer from pending responses map.
-         */
-        private void finish() {
-            GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
-                respVers, ctx.deploymentEnabled());
-
-            try {
-                ctx.kernalContext().gateway().readLock();
-
-                try {
-                    ctx.io().send(nodeId, msg, ctx.ioPolicy());
-
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
-                            ", node=" + nodeId + ']');
-                    }
-                }
-                finally {
-                    ctx.kernalContext().gateway().readUnlock();
-                }
-            }
-            catch (IllegalStateException ignored) {
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
-                        "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
-                }
-            }
-            catch (ClusterTopologyCheckedException ignored) {
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Failed to send deferred DHT update response, node left [" +
-                        "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
-                }
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send deferred DHT update response to remote node [" +
-                    "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
-            }
-
-            pendingResponses.remove(nodeId, this);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index d251528..4cbfb27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -526,7 +526,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     ) {
         GridCacheContext cacheCtx = entry.context();
 
-        List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+        List<ClusterNode> nodes = cacheCtx.isLocal() ?
+            cacheCtx.affinity().nodes(entry.key(), topVer) :
+            cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
 
         txMapping.addMapping(nodes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e17a76c..91cfbda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -27,7 +27,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -599,9 +598,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         GridCacheEntryEx cached0 = entry.cached();
 
         if (cached0.isDht())
-            nodes = cacheCtx.affinity().nodes(cached0.partition(), topVer);
+            nodes = cacheCtx.topology().nodes(cached0.partition(), topVer);
         else
-            nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+            nodes = cacheCtx.isLocal() ?
+                cacheCtx.affinity().nodes(entry.key(), topVer) :
+                cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
 
         txMapping.addMapping(nodes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 34b8281..5c09398 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -193,7 +193,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
             GridCacheContext cacheCtx = txEntry.context();
 
-            List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
+            List<ClusterNode> nodes = cacheCtx.isLocal() ?
+                cacheCtx.affinity().nodes(txEntry.key(), topVer) :
+                cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
 
             ClusterNode primary = F.first(nodes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index bb5d482..46604c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -34,6 +34,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
@@ -76,6 +78,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0");
 
     /** */
+    public static final IgniteProductVersion ACK_DHT_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.6.8");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
@@ -251,6 +256,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                         assert f.node().id().equals(nodeId);
 
+                        if (res.returnValue() != null)
+                            tx.implicitSingleResult(res.returnValue());
+
                         f.onDhtFinishResponse(res);
                     }
                 }
@@ -432,6 +440,50 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         catch (IgniteCheckedException e) {
             onDone(e);
         }
+        finally {
+            if (commit &&
+                tx.onePhaseCommit() &&
+                !tx.writeMap().isEmpty()) // Readonly operations require no ack.
+                ackBackup();
+        }
+    }
+
+    /**
+     *
+     */
+    private void ackBackup() {
+        if (mappings.empty())
+            return;
+
+        if (!tx.needReturnValue() || !tx.implicit())
+            return; // GridCacheReturn was not saved at backup.
+
+        GridDistributedTxMapping mapping = mappings.singleMapping();
+
+        if (mapping != null) {
+            UUID nodeId = mapping.node().id();
+
+            Collection<UUID> backups = tx.transactionNodes().get(nodeId);
+
+            if (!F.isEmpty(backups)) {
+                assert backups.size() == 1 : backups;
+
+                UUID backupId = F.first(backups);
+
+                ClusterNode backup = cctx.discovery().node(backupId);
+
+                // Nothing to do if backup has left the grid.
+                if (backup == null) {
+                    // No-op.
+                }
+                else if (backup.isLocal())
+                    cctx.tm().removeTxReturn(tx.xidVersion());
+                else {
+                    if (ACK_DHT_ONE_PHASE_SINCE.compareToIgnoreTimestamp(backup.version()) <= 0)
+                        cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion());
+                }
+            }
+        }
     }
 
     /**
@@ -475,23 +527,48 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         readyNearMappingFromBackup(mapping);
 
                         if (committed) {
-                            if (tx.syncMode() == FULL_SYNC) {
-                                GridCacheVersion nearXidVer = tx.nearXidVersion();
+                            try {
+                                if (tx.needReturnValue() && tx.implicit()) {
+                                    GridCacheReturnCompletableWrapper wrapper =
+                                        cctx.tm().getCommittedTxReturn(tx.xidVersion());
 
-                                assert nearXidVer != null : tx;
+                                    assert wrapper != null : tx.xidVersion();
 
-                                IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+                                    GridCacheReturn retVal = wrapper.fut().get();
 
-                                fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                                    @Override public void apply(IgniteInternalFuture<?> fut) {
-                                        mini.onDone(tx);
-                                    }
-                                });
+                                    assert retVal != null;
+
+                                    tx.implicitSingleResult(retVal);
+                                }
 
-                                return;
+                                if (tx.syncMode() == FULL_SYNC) {
+                                    GridCacheVersion nearXidVer = tx.nearXidVersion();
+
+                                    assert nearXidVer != null : tx;
+
+                                    IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+                                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                        @Override public void apply(IgniteInternalFuture<?> fut) {
+                                            mini.onDone(tx);
+                                        }
+                                    });
+
+                                    return;
+                                }
+
+                                mini.onDone(tx);
                             }
+                            catch (IgniteCheckedException e) {
+                                if (msgLog.isDebugEnabled()) {
+                                    msgLog.debug("Near finish fut, failed to finish [" +
+                                        "txId=" + tx.nearXidVersion() +
+                                        ", node=" + backup.id() +
+                                        ", err=" + e + ']');
+                                }
 
-                            mini.onDone(tx);
+                                mini.onDone(e);
+                            }
                         }
                         else {
                             ClusterTopologyCheckedException cause =
@@ -504,7 +581,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         }
                     }
                     else {
-                        GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
+                        GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId(), false);
 
                         // Preserve old behavior, otherwise response is not sent.
                         if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
@@ -765,9 +842,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
     /**
      * @param miniId Mini future ID.
+     * @param waitRemoteTxs Wait for remote txs.
      * @return Finish request.
      */
-    private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+    private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, boolean waitRemoteTxs) {
         GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
             cctx.localNodeId(),
             futureId(),
@@ -791,7 +869,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             0,
             null,
             0,
-            tx.activeCachesDeploymentEnabled());
+            tx.activeCachesDeploymentEnabled(),
+            !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
+            waitRemoteTxs);
 
         finishReq.checkCommitted(true);
 
@@ -872,9 +952,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                             add(mini);
 
-                            GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
-
-                            req.waitRemoteTransactions(true);
+                            GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
 
                             for (UUID backupId : backups) {
                                 ClusterNode backup = cctx.discovery().node(backupId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb2989e..18c3011 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -151,6 +152,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     @GridToStringExclude
     protected GridCacheSharedContext<?, ?> cctx;
 
+    /** Need return value. */
+    protected boolean needRetVal;
+
     /**
      * End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
      * assigned to this transaction at the end of write phase.
@@ -695,6 +699,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /**
+     * @return Flag indicating whether transaction needs return value.
+     */
+    public boolean needReturnValue() {
+        return needRetVal;
+    }
+
+    /**
+     * @param needRetVal Need return value flag.
+     */
+    public void needReturnValue(boolean needRetVal) {
+        this.needRetVal = needRetVal;
+    }
+
+    /**
      * Gets remaining allowed transaction time.
      *
      * @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out.
@@ -1285,7 +1303,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                         if (intercept || !F.isEmpty(e.entryProcessors()))
                             e.cached().unswap(false);
 
-                        IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false);
+                        IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false, null);
 
                         GridCacheContext cacheCtx = e.context();
 
@@ -1443,13 +1461,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     /**
      * @param txEntry Entry to process.
      * @param metrics {@code True} if metrics should be updated.
+     * @param ret Optional return value to initialize.
      * @return Tuple containing transformation results.
      * @throws IgniteCheckedException If failed to get previous value for transform.
      * @throws GridCacheEntryRemovedException If entry was concurrently deleted.
      */
     protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures(
         IgniteTxEntry txEntry,
-        boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
+        boolean metrics,
+        @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException {
         GridCacheContext cacheCtx = txEntry.context();
 
         assert cacheCtx != null;
@@ -1457,8 +1477,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         if (isSystemInvalidate())
             return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
 
-        if (F.isEmpty(txEntry.entryProcessors()))
+        if (F.isEmpty(txEntry.entryProcessors())) {
+            if (ret != null)
+                ret.value(cacheCtx, txEntry.value(), txEntry.keepBinary());
+
             return F.t(txEntry.op(), txEntry.value());
+        }
         else {
             T2<GridCacheOperation, CacheObject> calcVal = txEntry.entryProcessorCalculatedValue();
 
@@ -1508,17 +1532,27 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                 CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
                     txEntry.key(), key, cacheVal, val, ver, keepBinary, txEntry.cached());
 
+                Object procRes = null;
+                Exception err = null;
+
                 try {
                     EntryProcessor<Object, Object, Object> processor = t.get1();
 
-                    processor.process(invokeEntry, t.get2());
+                    procRes = processor.process(invokeEntry, t.get2());
 
                     val = invokeEntry.getValue();
 
                     key = invokeEntry.key();
                 }
-                catch (Exception ignore) {
-                    // No-op.
+                catch (Exception e) {
+                    err = e;
+                }
+
+                if (ret != null) {
+                    if (err != null || procRes != null)
+                        ret.addEntryProcessResult(txEntry.context(), txEntry.key(), null, procRes, err, keepBinary);
+                    else
+                        ret.invokeResult(true);
                 }
 
                 modified |= invokeEntry.modified();


[05/12] ignite git commit: IGNITE-3406 - Fix incorrect patch.

Posted by vo...@apache.org.
IGNITE-3406 - Fix incorrect patch.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3eff6b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3eff6b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3eff6b6

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c3eff6b6c7817f83f07afcff8784ec6aa9473876
Parents: 147ab9c
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Sep 19 16:18:14 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Sep 19 16:18:14 2016 +0300

----------------------------------------------------------------------
 ...idAbstractCacheInterceptorRebalanceTest.java | 356 +++++++++++++++++++
 ...heInterceptorAtomicOffheapRebalanceTest.java |  30 ++
 ...GridCacheInterceptorAtomicRebalanceTest.java |  36 ++
 ...ceptorTransactionalOffheapRebalanceTest.java |  35 ++
 ...heInterceptorTransactionalRebalanceTest.java |  36 ++
 5 files changed, 493 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
new file mode 100644
index 0000000..9405a19
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public abstract class GridAbstractCacheInterceptorRebalanceTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE_NAME = "test_cache";
+
+    /** */
+    private static final int CNT = 10_000;
+
+    /** */
+    private static final int TEST_ITERATIONS = 5;
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static volatile boolean failed;
+
+    /** */
+    private static CacheInterceptor<Integer, Integer> interceptor;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        final CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+        ccfg.setInterceptor(interceptor);
+        ccfg.setAtomicityMode(atomicityMode());
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+        ccfg.setBackups(2);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache atomicity mode.
+     */
+    protected abstract CacheAtomicityMode atomicityMode();
+
+    /**
+     * @return Cache memory mode;
+     */
+    protected abstract CacheMemoryMode memoryMode();
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testRebalanceUpdate() throws Exception {
+        interceptor = new RebalanceUpdateInterceptor();
+
+        testRebalance(new Operation() {
+            @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+                cache.put(key, val);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testRebalanceUpdateInvoke() throws Exception {
+        interceptor = new RebalanceUpdateInterceptor();
+
+        final UpdateEntryProcessor proc = new UpdateEntryProcessor();
+
+        testRebalance(new Operation() {
+            @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+                cache.invoke(key, proc, val);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testRebalanceRemoveInvoke() throws Exception {
+        interceptor = new RebalanceUpdateInterceptor();
+
+        final RemoveEntryProcessor proc = new RemoveEntryProcessor();
+
+        testRebalance(new Operation() {
+            @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+                cache.invoke(key, proc, val);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testRebalanceRemove() throws Exception {
+        interceptor = new RebalanceRemoveInterceptor();
+
+        testRebalance(new Operation() {
+            @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+                cache.remove(key);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testPutIfAbsent() throws Exception {
+        interceptor = new RebalanceUpdateInterceptor();
+
+        testRebalance(new Operation() {
+            @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+                cache.putIfAbsent(key, val);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testGetAndPut() throws Exception {
+        interceptor = new RebalanceUpdateInterceptor();
+
+        testRebalance(new Operation() {
+            @Override public void run(final IgniteCache<Integer, Integer> cache, final Integer key, final Integer val) {
+                final Integer old = cache.getAndPut(key, val);
+
+                assert val == old + 1 : "Unexpected old value: " + old;
+            }
+        });
+    }
+
+    /**
+     * @param operation Operation to be tested.
+     * @throws Exception If fail.
+     */
+    private void testRebalance(final Operation operation) throws Exception {
+        interceptor = new RebalanceUpdateInterceptor();
+
+        for (int iter = 0; iter < TEST_ITERATIONS; iter++) {
+            log.info("Iteration: " + iter);
+
+            failed = false;
+
+            final IgniteEx ignite = startGrid(1);
+
+            final IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+
+            for (int i = 0; i < CNT; i++)
+                cache.put(i, i);
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final IgniteInternalFuture<Object> updFut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    latch.await();
+
+                    for (int j = 1; j <= 3; j++) {
+                        for (int i = 0; i < CNT; i++) {
+                            if (i % 2 == 0) {
+                                try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                    operation.run(cache, i, i + j);
+
+                                    tx.commit();
+                                }
+                            }
+                            else
+                                operation.run(cache, i, i + j);
+                        }
+                    }
+
+                    return null;
+                }
+            });
+
+            final IgniteInternalFuture<Object> rebFut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    latch.await();
+
+                    for (int i = 2; i < NODES; i++)
+                        startGrid(i);
+
+                    return null;
+                }
+            });
+
+            latch.countDown();
+
+            updFut.get();
+            rebFut.get();
+
+            stopAllGrids();
+
+            assertFalse(failed);
+        }
+    }
+
+    /**
+     *
+     */
+    private interface Operation {
+        /**
+         * @param cache Cache.
+         * @param key Key.
+         * @param val Value.
+         */
+        void run(IgniteCache<Integer, Integer> cache, Integer key, Integer val);
+    }
+
+    /**
+     *
+     */
+    private static class UpdateEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer process(final MutableEntry<Integer, Integer> entry,
+            final Object... arguments) throws EntryProcessorException {
+            entry.setValue((Integer) arguments[0]);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer process(final MutableEntry<Integer, Integer> entry,
+            final Object... arguments) throws EntryProcessorException {
+            entry.remove();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RebalanceUpdateInterceptor extends CacheInterceptorAdapter<Integer, Integer> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer onBeforePut(final Cache.Entry entry, final Integer newVal) {
+            try {
+                boolean first = entry.getKey().equals(newVal);
+
+                if (first)
+                    assertNull("Expected null old value: " + entry, entry.getValue());
+                else {
+                    Integer old = (Integer)entry.getValue();
+
+                    assertNotNull("Null old value: " + entry, old);
+                    assertEquals("Unexpected old value: " + entry, newVal.intValue(), old + 1);
+                }
+            }
+            catch (Throwable e) {
+                failed = true;
+
+                System.out.println("Unexpected error: " + e);
+                e.printStackTrace(System.out);
+            }
+
+            return newVal;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RebalanceRemoveInterceptor extends CacheInterceptorAdapter<Integer, Integer> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Nullable @Override public IgniteBiTuple<Boolean, Integer> onBeforeRemove(
+            final Cache.Entry<Integer, Integer> entry) {
+            try {
+                assertNotNull("Null old value: " + entry, entry.getValue());
+                assertEquals("Unexpected old value: " + entry, entry.getKey(), entry.getValue());
+            }
+            catch (Throwable t) {
+                failed = true;
+
+                System.out.println("Unexpected error: " + t);
+                t.printStackTrace(System.out);
+            }
+
+            return new T2<>(true, null);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java
new file mode 100644
index 0000000..103322f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicOffheapRebalanceTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorAtomicOffheapRebalanceTest extends GridCacheInterceptorAtomicRebalanceTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return CacheMemoryMode.OFFHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java
new file mode 100644
index 0000000..aaeda4b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAtomicRebalanceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorAtomicRebalanceTest extends GridAbstractCacheInterceptorRebalanceTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return CacheMemoryMode.ONHEAP_TIERED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java
new file mode 100644
index 0000000..bb90062
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalOffheapRebalanceTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorTransactionalOffheapRebalanceTest extends GridCacheInterceptorTransactionalRebalanceTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return CacheMemoryMode.OFFHEAP_TIERED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3eff6b6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java
new file mode 100644
index 0000000..bace87c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorTransactionalRebalanceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class GridCacheInterceptorTransactionalRebalanceTest extends GridAbstractCacheInterceptorRebalanceTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return CacheMemoryMode.ONHEAP_TIERED;
+    }
+}


[09/12] ignite git commit: Client discovery: wait during join if receive RES_CONTINUE_JOIN, RES_WAIT.

Posted by vo...@apache.org.
Client discovery: wait during join if receive RES_CONTINUE_JOIN, RES_WAIT.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1372ce2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1372ce2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1372ce2

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c1372ce2f0633968036fcfb079718214605c3350
Parents: 780bf23
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 20 11:39:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 20 11:39:37 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java      | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1372ce2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index bf7f519..2c85645 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -497,6 +497,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             Iterator<InetSocketAddress> it = addrs.iterator();
 
+            boolean wait = false;
+
             while (it.hasNext()) {
                 if (Thread.currentThread().isInterrupted())
                     throw new InterruptedException();
@@ -515,12 +517,17 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 Socket sock = sockAndRes.get1().socket();
 
+                if (log.isDebugEnabled())
+                    log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
+
                 switch (sockAndRes.get2()) {
                     case RES_OK:
                         return new T2<>(sockAndRes.get1(), sockAndRes.get3());
 
                     case RES_CONTINUE_JOIN:
                     case RES_WAIT:
+                        wait = true;
+
                         U.closeQuiet(sock);
 
                         break;
@@ -533,7 +540,16 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (addrs.isEmpty()) {
+            if (wait) {
+                if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
+                    return null;
+
+                if (log.isDebugEnabled())
+                    log.debug("Will wait before retry join.");
+
+                Thread.sleep(2000);
+            }
+            else if (addrs.isEmpty()) {
                 if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
                     return null;
 


[12/12] ignite git commit: Merge branch 'ignite-1.6.8' into ignite-1.6.8-hadoop

Posted by vo...@apache.org.
Merge branch 'ignite-1.6.8' into ignite-1.6.8-hadoop


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d08b5cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d08b5cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d08b5cb

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 4d08b5cb52a7d0dc1218ebdb750cef8b0032f810
Parents: ee5a818 16b82b7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Sep 23 13:40:27 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 23 13:40:27 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  12 +
 .../internal/GridEventConsumeHandler.java       |   5 -
 .../internal/GridMessageListenHandler.java      |   5 -
 .../internal/binary/BinaryObjectExImpl.java     | 164 ++++++---
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheEntryEx.java      |   8 +
 .../processors/cache/GridCacheMapEntry.java     |   9 +-
 .../GridCacheReturnCompletableWrapper.java      | 101 ++++++
 .../cache/GridDeferredAckMessageSender.java     | 219 ++++++++++++
 .../GridDistributedTxRemoteAdapter.java         |  65 +++-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  33 +-
 .../dht/GridDhtTxFinishResponse.java            |  52 ++-
 .../dht/GridDhtTxOnePhaseCommitAckRequest.java  | 134 +++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java |  42 ++-
 .../dht/GridDhtTxPrepareRequest.java            |  93 +++--
 .../cache/distributed/dht/GridDhtTxRemote.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 227 +++---------
 ...arOptimisticSerializableTxPrepareFuture.java |   4 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   7 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   4 +-
 .../near/GridNearTxFinishFuture.java            | 112 +++++-
 .../continuous/CacheContinuousQueryHandler.java |   5 -
 .../cache/transactions/IgniteTxAdapter.java     |  46 ++-
 .../cache/transactions/IgniteTxEntry.java       |  44 ++-
 .../cache/transactions/IgniteTxHandler.java     | 163 +++++++--
 .../transactions/IgniteTxLocalAdapter.java      |  27 +-
 .../cache/transactions/IgniteTxManager.java     | 154 +++++++-
 .../continuous/GridContinuousHandler.java       |   8 -
 .../continuous/GridContinuousProcessor.java     |  33 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  18 +-
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |  38 +-
 .../binary/BinaryObjectToStringSelfTest.java    |  92 +++++
 .../CacheSwapUnswapGetTestSmallQueueSize.java   |  35 ++
 ...idAbstractCacheInterceptorRebalanceTest.java | 356 +++++++++++++++++++
 ...heInterceptorAtomicOffheapRebalanceTest.java |  30 ++
 ...GridCacheInterceptorAtomicRebalanceTest.java |  36 ++
 ...ceptorTransactionalOffheapRebalanceTest.java |  35 ++
 ...heInterceptorTransactionalRebalanceTest.java |  36 ++
 .../processors/cache/GridCacheTestEntryEx.java  |   4 +
 .../IgniteCacheInterceptorSelfTestSuite.java    |   5 +
 .../IgniteCachePutRetryAbstractSelfTest.java    |  39 +-
 ...gniteCachePutRetryTransactionalSelfTest.java |  75 +++-
 ...ContinuousQueryFailoverAbstractSelfTest.java |  99 ++++++
 ...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++
 .../file/GridFileSwapSpaceSpiSelfTest.java      |  89 +++++
 .../IgniteBinaryObjectsTestSuite.java           |   2 +
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 .../config/benchmark-client-mode.properties     |   2 +
 .../config/benchmark-tx-win.properties          |   2 +
 .../yardstick/config/benchmark-tx.properties    |   2 +
 .../yardstick/config/benchmark-win.properties   |   2 +
 modules/yardstick/config/benchmark.properties   |   2 +
 .../cache/IgniteGetAndPutBenchmark.java         |  41 +++
 .../cache/IgniteGetAndPutTxBenchmark.java       |  70 ++++
 .../cache/IgniteInvokeTxBenchmark.java          |  40 +++
 56 files changed, 2666 insertions(+), 447 deletions(-)
----------------------------------------------------------------------



[04/12] ignite git commit: IGNITE-3406 - Interceptor and continuous query get correct old value during rebalancing.

Posted by vo...@apache.org.
IGNITE-3406 - Interceptor and continuous query get correct old value during rebalancing.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/147ab9c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/147ab9c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/147ab9c0

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 147ab9c08f6ac7edecf656b23d8b25bfab91becf
Parents: c24caba
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Sep 19 13:58:41 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Sep 19 13:58:41 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |  8 ++
 .../processors/cache/GridCacheMapEntry.java     |  9 +-
 .../GridDistributedTxRemoteAdapter.java         |  6 ++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 36 ++++++-
 .../cache/transactions/IgniteTxEntry.java       | 44 ++++++++-
 .../transactions/IgniteTxLocalAdapter.java      |  8 ++
 .../processors/cache/GridCacheTestEntryEx.java  |  4 +
 .../IgniteCacheInterceptorSelfTestSuite.java    |  5 +
 ...ContinuousQueryFailoverAbstractSelfTest.java | 99 ++++++++++++++++++++
 9 files changed, 213 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 616854f..ef6a244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -358,6 +358,8 @@ public interface GridCacheEntryEx {
      * @param evt Flag to signal event notification.
      * @param metrics Flag to signal metrics update.
      * @param keepBinary Keep binary flag.
+     * @param oldValPresent {@code True} if oldValue present.
+     * @param oldVal Old value.
      * @param topVer Topology version.
      * @param filter Filter.
      * @param drType DR type.
@@ -383,6 +385,8 @@ public interface GridCacheEntryEx {
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -402,6 +406,8 @@ public interface GridCacheEntryEx {
      * @param evt Flag to signal event notification.
      * @param metrics Flag to signal metrics notification.
      * @param keepBinary Keep binary flag.
+     * @param oldValPresent {@code True} if oldValue present.
+     * @param oldVal Old value.
      * @param topVer Topology version.
      * @param filter Filter.
      * @param drType DR type.
@@ -422,6 +428,8 @@ public interface GridCacheEntryEx {
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c760ac1..a9ac1e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1141,6 +1141,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -1198,7 +1200,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             Map<UUID, CacheContinuousQueryListener> lsnrCol =
                 notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
 
-            old = (retval || intercept || lsnrCol != null) ?
+            old = oldValPresent ? oldVal :
+                (retval || intercept || lsnrCol != null) ?
                 rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val;
 
             if (intercept) {
@@ -1333,6 +1336,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -1403,7 +1408,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             Map<UUID, CacheContinuousQueryListener> lsnrCol =
                 notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
 
-            old = (retval || intercept || lsnrCol != null) ?
+            old = oldValPresent ? oldVal : (retval || intercept || lsnrCol != null) ?
                 rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 
             if (intercept) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c56d1f7..9d9862a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -542,6 +542,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 true,
                                                 true,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 null,
                                                 replicate ? DR_BACKUP : DR_NONE,
@@ -561,6 +563,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 true,
                                                 true,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 null,
                                                 replicate ? DR_BACKUP : DR_NONE,
@@ -592,6 +596,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                             true,
                                             true,
                                             txEntry.keepBinary(),
+                                            txEntry.hasOldValue(),
+                                            txEntry.oldValue(),
                                             topVer,
                                             null,
                                             replicate ? DR_BACKUP : DR_NONE,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1bdd9b8..ec73bff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -360,7 +360,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                 boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
 
-                if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
+                CacheObject val;
+                CacheObject oldVal = null;
+
+                boolean readOld = hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM;
+
+                if (readOld) {
                     cached.unswap(retVal);
 
                     boolean readThrough = !txEntry.skipStore() &&
@@ -375,7 +380,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                     final boolean keepBinary = txEntry.keepBinary();
 
-                    CacheObject val = cached.innerGet(
+                    val = oldVal = cached.innerGet(
                         null,
                         tx,
                         /*swap*/true,
@@ -470,6 +475,33 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     else
                         ret.success(txEntry.op() != DELETE || cached.hasValue());
                 }
+
+                // Send old value in case if rebalancing is not finished.
+                final boolean sndOldVal = !cacheCtx.isLocal() && !cacheCtx.topology().rebalanceFinished(tx.topologyVersion());
+
+                if (sndOldVal) {
+                    if (oldVal == null && !readOld) {
+                        oldVal = cached.innerGet(
+                            null,
+                            tx,
+                            /*swap*/true,
+                            /*readThrough*/false,
+                            /*metrics*/false,
+                            /*event*/false,
+                            /*tmp*/false,
+                            /*subjectId*/tx.subjectId(),
+                            /*transformClo*/null,
+                            /*taskName*/null,
+                            /*expiryPlc*/null,
+                            /*keepBinary*/true);
+                    }
+
+                    if (oldVal != null) {
+                        oldVal.prepareMarshal(cacheCtx.cacheObjectContext());
+
+                        txEntry.oldValue(oldVal, true);
+                    }
+                }
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to get result value for cache entry: " + cached, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 87b2525..194208e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -115,6 +115,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     @GridDirectTransient
     private TxEntryValueHolder prevVal = new TxEntryValueHolder();
 
+    /** Old value before update. */
+    @GridToStringInclude
+    private TxEntryValueHolder oldVal = new TxEntryValueHolder();
+
     /** Transform. */
     @GridToStringInclude
     @GridDirectTransient
@@ -497,7 +501,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
-     * @param oldValOnPrimary {@code True} If old value for 'invoke' operation was non null on primary node.
+     * @param oldValOnPrimary {@code True} If old value for was non null on primary node.
      */
     public void oldValueOnPrimary(boolean oldValOnPrimary) {
         setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
@@ -583,6 +587,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
+     * @return Old value.
+     */
+    @Nullable public CacheObject oldValue() {
+        return oldVal != null ? oldVal.value() : null;
+    }
+
+    /**
+     * @param oldVal Old value.
+     */
+    public void oldValue(CacheObject oldVal, boolean hasOldVal) {
+        if (this.oldVal == null)
+            this.oldVal = new TxEntryValueHolder();
+
+        this.oldVal.value(op(), oldVal, hasOldVal, hasOldVal);
+    }
+
+    /**
+     * @return {@code True} if old value present.
+     */
+    public boolean hasOldValue() {
+        return oldVal != null && oldVal.hasValue();
+    }
+
+    /**
      * @return {@code True} if has value explicitly set.
      */
     public boolean hasValue() {
@@ -1069,6 +1097,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
                 writer.incrementState();
 
+            case 13:
+                if (!writer.writeMessage("oldVal", oldVal))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -1186,6 +1219,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
                 reader.incrementState();
 
+            case 13:
+                oldVal = reader.readMessage("oldVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1198,7 +1238,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ee992cc..637f322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -809,6 +809,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             evt,
                                             metrics,
                                             txEntry.keepBinary(),
+                                            txEntry.hasOldValue(),
+                                            txEntry.oldValue(),
                                             topVer,
                                             null,
                                             cached.detached() ? DR_NONE : drType,
@@ -834,6 +836,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 false,
                                                 metrics,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 CU.empty0(),
                                                 DR_NONE,
@@ -854,6 +858,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             evt,
                                             metrics,
                                             txEntry.keepBinary(),
+                                            txEntry.hasOldValue(),
+                                            txEntry.oldValue(),
                                             topVer,
                                             null,
                                             cached.detached()  ? DR_NONE : drType,
@@ -875,6 +881,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 false,
                                                 metrics,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 CU.empty0(),
                                                 DR_NONE,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 400fb14..bf543cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -477,6 +477,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean hasOldVal,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -556,6 +558,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
index d19ecd7..17d88ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
@@ -58,6 +58,11 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class);
         suite.addTestSuite(CacheInterceptorPartitionCounterLocalSanityTest.class);
 
+        suite.addTestSuite(GridCacheInterceptorAtomicRebalanceTest.class);
+        suite.addTestSuite(GridCacheInterceptorTransactionalRebalanceTest.class);
+        suite.addTestSuite(GridCacheInterceptorAtomicOffheapRebalanceTest.class);
+        suite.addTestSuite(GridCacheInterceptorTransactionalOffheapRebalanceTest.class);
+
         return suite;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 083367c..1376be1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -57,6 +57,8 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -65,6 +67,7 @@ import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -312,6 +315,102 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * Test that during rebalancing correct old value passed to continuous query.
+     *
+     * @throws Exception If fail.
+     */
+    public void testRebalance() throws Exception {
+        for (int iter = 0; iter < 5; iter++) {
+            log.info("Iteration: " + iter);
+
+            final IgniteEx ignite = startGrid(1);
+
+            final CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("testCache");
+
+            ccfg.setAtomicityMode(atomicityMode());
+            ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            ccfg.setCacheMode(cacheMode());
+            ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+            ccfg.setBackups(2);
+
+            final IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(ccfg);
+
+            final int KEYS = 10_000;
+
+            for (int i = 0; i < KEYS; i++)
+                cache.put(i, i);
+
+            final ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+            final AtomicBoolean err = new AtomicBoolean();
+
+            final AtomicInteger cntr = new AtomicInteger();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+                @Override public void onUpdated(
+                    final Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> cacheEntryEvts) {
+                    try {
+                        for (final CacheEntryEvent<? extends Integer, ? extends Integer> evt : cacheEntryEvts) {
+                            final Integer oldVal = evt.getOldValue();
+
+                            final Integer val = evt.getValue();
+
+                            assertNotNull("No old value: " + evt, oldVal);
+                            assertEquals("Unexpected old value: " + evt, (Integer)(oldVal + 1), val);
+
+                            cntr.incrementAndGet();
+                        }
+                    }
+                    catch (Throwable e) {
+                        err.set(true);
+
+                        error("Unexpected error: " + e, e);
+                    }
+                }
+            });
+
+            final QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry);
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final IgniteInternalFuture<Object> updFut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    latch.await();
+
+                    for (int i = 0; i < KEYS && !err.get(); i++)
+                        cache.put(i, i + 1);
+
+                    return null;
+                }
+            });
+
+            final IgniteInternalFuture<Object> rebFut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    latch.await();
+
+                    for (int i = 2; i <= 5 && !err.get(); i++)
+                        startGrid(i);
+
+                    return null;
+                }
+            });
+
+            latch.countDown();
+
+            updFut.get();
+            rebFut.get();
+
+            assertFalse("Unexpected error during test", err.get());
+
+            assertTrue(cntr.get() > 0);
+
+            cur.close();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
      * @param ignite Ignite.
      * @param topVer Topology version.
      * @throws Exception If failed.


[02/12] ignite git commit: IGNITE-3635 - Fixed StackOverflowError thrown from BinaryObject.toString()

Posted by vo...@apache.org.
IGNITE-3635 - Fixed StackOverflowError thrown from BinaryObject.toString()


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c0b2b479
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c0b2b479
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c0b2b479

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c0b2b4797be4f250f6f1304ff27d45c72154608a
Parents: 98914fe
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Fri Sep 16 14:59:35 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Sep 16 14:59:35 2016 -0700

----------------------------------------------------------------------
 .../internal/binary/BinaryObjectExImpl.java     | 161 +++++++++++++------
 .../binary/BinaryObjectToStringSelfTest.java    |  75 +++++++++
 .../IgniteBinaryObjectsTestSuite.java           |   2 +
 3 files changed, 190 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c0b2b479/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
index b4e909e..e6df407 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
@@ -20,14 +20,16 @@ package org.apache.ignite.internal.binary;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.binary.BinaryType;
-import org.apache.ignite.binary.BinaryObject;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -164,6 +166,20 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        try {
+            BinaryReaderHandles ctx = new BinaryReaderHandles();
+
+            ctx.put(start(), this);
+
+            return toString(ctx, new IdentityHashMap<BinaryObject, Integer>());
+        }
+        catch (BinaryObjectException e) {
+            throw new IgniteException("Failed to create string representation of binary object.", e);
+        }
+    }
+
     /**
      * @param ctx Reader context.
      * @param handles Handles for already traversed objects.
@@ -197,43 +213,7 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
 
                 buf.a(", ").a(name).a('=');
 
-                if (val instanceof byte[])
-                    buf.a(Arrays.toString((byte[]) val));
-                else if (val instanceof short[])
-                    buf.a(Arrays.toString((short[])val));
-                else if (val instanceof int[])
-                    buf.a(Arrays.toString((int[])val));
-                else if (val instanceof long[])
-                    buf.a(Arrays.toString((long[])val));
-                else if (val instanceof float[])
-                    buf.a(Arrays.toString((float[])val));
-                else if (val instanceof double[])
-                    buf.a(Arrays.toString((double[])val));
-                else if (val instanceof char[])
-                    buf.a(Arrays.toString((char[])val));
-                else if (val instanceof boolean[])
-                    buf.a(Arrays.toString((boolean[]) val));
-                else if (val instanceof BigDecimal[])
-                    buf.a(Arrays.toString((BigDecimal[])val));
-                else {
-                    if (val instanceof BinaryObjectExImpl) {
-                        BinaryObjectExImpl po = (BinaryObjectExImpl)val;
-
-                        Integer idHash0 = handles.get(val);
-
-                        if (idHash0 != null) {  // Circular reference.
-                            BinaryType meta0 = po.rawType();
-
-                            assert meta0 != null;
-
-                            buf.a(meta0.typeName()).a(" [hash=").a(idHash0).a(", ...]");
-                        }
-                        else
-                            buf.a(po.toString(ctx, handles));
-                    }
-                    else
-                        buf.a(val);
-                }
+                appendValue(val, buf, ctx, handles);
             }
 
             buf.a(']');
@@ -242,17 +222,102 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
         return buf.toString();
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        try {
-            BinaryReaderHandles ctx = new BinaryReaderHandles();
+    /**
+     * @param val Value to append.
+     * @param buf Buffer to append to.
+     * @param ctx Reader context.
+     * @param handles Handles for already traversed objects.
+     */
+    private void appendValue(Object val, SB buf, BinaryReaderHandles ctx,
+        IdentityHashMap<BinaryObject, Integer> handles) {
+        if (val instanceof byte[])
+            buf.a(Arrays.toString((byte[]) val));
+        else if (val instanceof short[])
+            buf.a(Arrays.toString((short[])val));
+        else if (val instanceof int[])
+            buf.a(Arrays.toString((int[])val));
+        else if (val instanceof long[])
+            buf.a(Arrays.toString((long[])val));
+        else if (val instanceof float[])
+            buf.a(Arrays.toString((float[])val));
+        else if (val instanceof double[])
+            buf.a(Arrays.toString((double[])val));
+        else if (val instanceof char[])
+            buf.a(Arrays.toString((char[])val));
+        else if (val instanceof boolean[])
+            buf.a(Arrays.toString((boolean[]) val));
+        else if (val instanceof BigDecimal[])
+            buf.a(Arrays.toString((BigDecimal[])val));
+        else if (val instanceof BinaryObjectExImpl) {
+            BinaryObjectExImpl po = (BinaryObjectExImpl)val;
+
+            Integer idHash0 = handles.get(val);
+
+            if (idHash0 != null) {  // Circular reference.
+                BinaryType meta0 = po.rawType();
+
+                assert meta0 != null;
+
+                buf.a(meta0.typeName()).a(" [hash=").a(idHash0).a(", ...]");
+            }
+            else
+                buf.a(po.toString(ctx, handles));
+        }
+        else if (val instanceof Object[]) {
+            Object[] arr = (Object[])val;
 
-            ctx.put(start(), this);
+            buf.a('[');
 
-            return toString(ctx, new IdentityHashMap<BinaryObject, Integer>());
+            for (int i = 0; i < arr.length; i++) {
+                Object o = arr[i];
+
+                appendValue(o, buf, ctx, handles);
+
+                if (i < arr.length - 1)
+                    buf.a(", ");
+            }
         }
-        catch (BinaryObjectException e) {
-            throw new IgniteException("Failed to create string representation of binary object.", e);
+        else if (val instanceof Iterable) {
+            Iterable<Object> col = (Iterable<Object>)val;
+
+            buf.a(col.getClass().getSimpleName()).a(" {");
+
+            Iterator it = col.iterator();
+
+            while (it.hasNext()) {
+                Object o = it.next();
+
+                appendValue(o, buf, ctx, handles);
+
+                if (it.hasNext())
+                    buf.a(", ");
+            }
+
+            buf.a('}');
+        }
+        else if (val instanceof Map) {
+            Map<Object, Object> map = (Map<Object, Object>)val;
+
+            buf.a(map.getClass().getSimpleName()).a(" {");
+
+            Iterator<Map.Entry<Object, Object>> it = map.entrySet().iterator();
+
+            while (it.hasNext()) {
+                Map.Entry<Object, Object> e = it.next();
+
+                appendValue(e.getKey(), buf, ctx, handles);
+
+                buf.a('=');
+
+                appendValue(e.getValue(), buf, ctx, handles);
+
+                if (it.hasNext())
+                    buf.a(", ");
+            }
+
+            buf.a('}');
         }
+        else
+            buf.a(val);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c0b2b479/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
new file mode 100644
index 0000000..cc6cf8b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
@@ -0,0 +1,75 @@
+package org.apache.ignite.internal.binary;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@code BinaryObject.toString()}.
+ */
+public class BinaryObjectToStringSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new BinaryMarshaller());
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testToString() throws Exception {
+        MyObject obj = new MyObject();
+
+        obj.arr = new Object[] {111, "aaa", obj};
+        obj.col = Arrays.asList(222, "bbb", obj);
+
+        obj.map = new HashMap();
+
+        obj.map.put(10, 333);
+        obj.map.put(20, "ccc");
+        obj.map.put(30, obj);
+
+        BinaryObject bo = grid().binary().toBinary(obj);
+
+        // Check that toString() doesn't fail with StackOverflowError or other exceptions.
+        bo.toString();
+    }
+
+    /**
+     */
+    private static class MyObject {
+        /** Object array. */
+        private Object[] arr;
+
+        /** Collection. */
+        private Collection col;
+
+        /** Map. */
+        private Map map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c0b2b479/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
index dc0540d..c1d9974 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryMarshallerSelfTest;
 import org.apache.ignite.internal.binary.BinaryObjectBuilderAdditionalSelfTest;
 import org.apache.ignite.internal.binary.BinaryObjectBuilderDefaultMappersSelfTest;
 import org.apache.ignite.internal.binary.BinaryObjectBuilderSimpleNameLowerCaseMappersSelfTest;
+import org.apache.ignite.internal.binary.BinaryObjectToStringSelfTest;
 import org.apache.ignite.internal.binary.BinarySimpleNameTestPropertySelfTest;
 import org.apache.ignite.internal.binary.BinaryTreeSelfTest;
 import org.apache.ignite.internal.binary.GridBinaryAffinityKeySelfTest;
@@ -102,6 +103,7 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite {
         suite.addTestSuite(GridSimpleLowerCaseBinaryMappersBinaryMetaDataSelfTest.class);
         suite.addTestSuite(GridBinaryAffinityKeySelfTest.class);
         suite.addTestSuite(GridBinaryWildcardsSelfTest.class);
+        suite.addTestSuite(BinaryObjectToStringSelfTest.class);
 
         // Tests for objects with non-compact footers.
         suite.addTestSuite(BinaryMarshallerNonCompactSelfTest.class);


[10/12] ignite git commit: Added missing header to BinaryObjectToStringSelfTest.

Posted by vo...@apache.org.
Added missing header to BinaryObjectToStringSelfTest.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/135f0a8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/135f0a8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/135f0a8a

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 135f0a8a39fb6895fada18d210260deebfb9426d
Parents: c1372ce
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 10:33:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 10:33:11 2016 +0300

----------------------------------------------------------------------
 .../binary/BinaryObjectToStringSelfTest.java       | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/135f0a8a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
index cc6cf8b..df6bcde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.binary;
 
 import java.util.Arrays;


[08/12] ignite git commit: ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored

Posted by vo...@apache.org.
ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780bf23d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780bf23d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780bf23d

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 780bf23d5c89452dd062be4fab9e2e56d50bb9e2
Parents: 9b72d18
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 19 18:19:33 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 19 18:19:33 2016 +0300

----------------------------------------------------------------------
 .../spi/swapspace/file/FileSwapSpaceSpi.java    | 38 +++++++--
 .../CacheSwapUnswapGetTestSmallQueueSize.java   | 35 ++++++++
 .../file/GridFileSwapSpaceSpiSelfTest.java      | 89 ++++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |  2 +
 4 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 8809f08..9be5b93 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -639,7 +639,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
         if (space == null && create) {
             validateName(name);
 
-            Space old = spaces.putIfAbsent(masked, space = new Space(masked));
+            Space old = spaces.putIfAbsent(masked, space = new Space(masked, log));
 
             if (old != null)
                 space = old;
@@ -833,13 +833,21 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
         /** */
         private final int maxSize;
 
+        /** */
+        private final IgniteLogger log;
+
+        /** */
+        private boolean queueSizeWarn;
+
         /**
          * @param minTakeSize Min size.
          * @param maxSize Max size.
+         * @param log logger
          */
-        private SwapValuesQueue(int minTakeSize, int maxSize) {
+        private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger log) {
             this.minTakeSize = minTakeSize;
             this.maxSize = maxSize;
+            this.log = log;
         }
 
         /**
@@ -852,8 +860,24 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
             lock.lock();
 
             try {
-                while (size + val.len > maxSize)
-                    mayAdd.await();
+                boolean largeVal = val.len > maxSize;
+
+                if (largeVal) {
+                    if (!queueSizeWarn) {
+                        U.warn(log, "Trying to save in swap entry which have size more than write queue size. " +
+                            "You may wish to increase 'maxWriteQueueSize' in FileSwapSpaceSpi configuration " +
+                            "[queueMaxSize=" + maxSize + ", valSize=" + val.len + ']');
+
+                        queueSizeWarn = true;
+                    }
+
+                    while (size >= minTakeSize)
+                        mayAdd.await();
+                }
+                else {
+                    while (size + val.len > maxSize)
+                        mayAdd.await();
+                }
 
                 size += val.len;
 
@@ -1419,7 +1443,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
         private SwapFile right;
 
         /** */
-        private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, maxWriteQueSize);
+        private final SwapValuesQueue que;
 
         /** Partitions. */
         private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, SwapValue>> parts =
@@ -1442,11 +1466,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         /**
          * @param name Space name.
+         * @param log Logger.
          */
-        private Space(String name) {
+        private Space(String name, IgniteLogger log) {
             assert name != null;
 
             this.name = name;
+            this.que = new SwapValuesQueue(writeBufSize, maxWriteQueSize, log);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
new file mode 100644
index 0000000..8d189fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+
+/**
+ *
+ */
+public class CacheSwapUnswapGetTestSmallQueueSize extends CacheSwapUnswapGetTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((FileSwapSpaceSpi)cfg.getSwapSpaceSpi()).setMaxWriteQueueSize(2);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
index 64652b1..ab21165 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
@@ -25,11 +25,14 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -37,8 +40,10 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.swapspace.GridSwapSpaceSpiAbstractSelfTest;
 import org.apache.ignite.spi.swapspace.SwapKey;
 import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.junit.Assert;
 
 /**
  * Test for {@link FileSwapSpaceSpi}.
@@ -364,4 +369,88 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe
 
         assertEquals(hash0, hash1);
     }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSaveValueLargeThenQueueSize() throws IgniteCheckedException {
+        final String spaceName = "mySpace";
+        final SwapKey key = new SwapKey("key");
+
+        final byte[] val = new byte[FileSwapSpaceSpi.DFLT_QUE_SIZE * 2];
+        Arrays.fill(val, (byte)1);
+
+        IgniteInternalFuture<byte[]> fut = GridTestUtils.runAsync(new Callable<byte[]>() {
+            @Override public byte[] call() throws Exception {
+                return saveAndGet(spaceName, key, val);
+            }
+        });
+
+        byte[] bytes = fut.get(10_000);
+
+        Assert.assertArrayEquals(val, bytes);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSaveValueLargeThenQueueSizeMultiThreaded() throws Exception {
+        final String spaceName = "mySpace";
+
+        final int threads = 5;
+
+        long DURATION = 30_000;
+
+        final int maxSize = FileSwapSpaceSpi.DFLT_QUE_SIZE * 2;
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (!done.get()) {
+                        SwapKey key = new SwapKey(rnd.nextInt(1000));
+
+                        spi.store(spaceName, key, new byte[rnd.nextInt(0, maxSize)], context());
+                    }
+
+                    return null;
+                }
+            }, threads, " async-put");
+
+            Thread.sleep(DURATION);
+
+            done.set(true);
+
+            fut.get();
+        }
+        finally {
+           done.set(true);
+        }
+    }
+
+    /**
+     * @param spaceName Space name.
+     * @param key Key.
+     * @param val Value.
+     * @throws Exception If failed.
+     * @return Read bytes.
+     */
+    private byte[] saveAndGet(final String spaceName, final SwapKey key, byte[] val) throws Exception {
+        spi.store(spaceName, key, val, context());
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return spi.read(spaceName, key, context()) != null;
+            }
+        }, 10_000);
+
+        byte[] res = spi.read(spaceName, key, context());
+
+        assertNotNull(res);
+
+        return res;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 60d59d7..c494e73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
 import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTestSmallQueueSize;
 import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
 import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
 import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
@@ -304,6 +305,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class);
 
         suite.addTestSuite(CacheSwapUnswapGetTest.class);
+        suite.addTestSuite(CacheSwapUnswapGetTestSmallQueueSize.class);
 
         suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class);
         suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);


[03/12] ignite git commit: Merge remote-tracking branch 'community/ignite-1.6.8' into ignite-1.6.8

Posted by vo...@apache.org.
Merge remote-tracking branch 'community/ignite-1.6.8' into ignite-1.6.8


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c24cabaf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c24cabaf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c24cabaf

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: c24cabafd69804b3ac8e2c08895c9b9b9597a7f3
Parents: c0b2b47 ebf354c
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Fri Sep 16 14:59:51 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Sep 16 14:59:51 2016 -0700

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  35 ++-
 .../internal/GridEventConsumeHandler.java       |   5 -
 .../internal/GridMessageListenHandler.java      |   5 -
 .../cache/query/GridCacheQueryManager.java      |  13 +-
 .../continuous/CacheContinuousQueryHandler.java |   5 -
 .../continuous/GridContinuousHandler.java       |   8 -
 .../continuous/GridContinuousProcessor.java     |  33 +--
 ...eContinuousQueryMultiNodesFilteringTest.java | 161 ++++++++++++
 .../hadoop/fs/BasicHadoopFileSystemFactory.java |  17 +-
 .../processors/hadoop/HadoopClassLoader.java    |   6 +-
 .../processors/hadoop/HadoopClasspathMain.java  |   2 +-
 .../processors/hadoop/HadoopClasspathUtils.java | 230 +++++++++++++---
 .../processors/hadoop/HadoopDefaultJobInfo.java |   1 -
 .../internal/processors/hadoop/HadoopUtils.java |  53 ++--
 .../processors/hadoop/v2/HadoopV2Job.java       |  32 +--
 .../hadoop/v2/HadoopV2JobResourceManager.java   |   5 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  15 +-
 .../igfs/HadoopFIleSystemFactorySelfTest.java   |  10 -
 .../processors/hadoop/HadoopTestUtils.java      |  73 +++++-
 .../hadoop/HadoopUserLibsSelfTest.java          | 260 +++++++++++++++++++
 .../testsuites/IgniteHadoopTestSuite.java       |   3 +
 ...gniteCacheReplicatedFieldsQuerySelfTest.java |   6 +-
 .../IgniteCacheReplicatedQuerySelfTest.java     |   4 +-
 23 files changed, 811 insertions(+), 171 deletions(-)
----------------------------------------------------------------------



[11/12] ignite git commit: IGNITE-3635: Additional fix for stack overflow in binary objects.

Posted by vo...@apache.org.
IGNITE-3635: Additional fix for stack overflow in binary objects.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16b82b77
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16b82b77
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16b82b77

Branch: refs/heads/ignite-1.6.8-hadoop
Commit: 16b82b77f00dff8e525c8cc68d3387de107c78d1
Parents: 135f0a8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 12:35:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 12:35:07 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/binary/BinaryObjectExImpl.java     | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16b82b77/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
index e6df407..063bd83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
@@ -30,6 +30,7 @@ import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -248,6 +249,8 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
             buf.a(Arrays.toString((boolean[]) val));
         else if (val instanceof BigDecimal[])
             buf.a(Arrays.toString((BigDecimal[])val));
+        else if (val instanceof IgniteUuid)
+            buf.a(val);
         else if (val instanceof BinaryObjectExImpl) {
             BinaryObjectExImpl po = (BinaryObjectExImpl)val;