You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/18 21:21:59 UTC

[1/5] ignite git commit: Performance optimizations - reviewed by Yakov.

Repository: ignite
Updated Branches:
  refs/heads/ignite-single-op-get 4e1caa6ab -> a68645a61


http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index e8546ef..1a26028 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -273,7 +273,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
      * @throws IgniteCheckedException If failed.
      * @throws GridDistributedLockCancelledException If lock has been cancelled.
      */
-    @SuppressWarnings({"RedundantTypeArguments"})
+    @SuppressWarnings({"RedundantTypeArguments", "ForLoopReplaceableByForEach"})
     @Nullable public GridNearTxRemote startRemoteTx(UUID nodeId, GridDhtLockRequest req)
         throws IgniteCheckedException, GridDistributedLockCancelledException {
         List<KeyCacheObject> nearKeys = req.nearKeys();
@@ -285,6 +285,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         if (ldr != null) {
             Collection<IgniteTxKey> evicted = null;
 
+            // Avoid iterator creation.
             for (int i = 0; i < nearKeys.size(); i++) {
                 KeyCacheObject key = nearKeys.get(i);
 
@@ -293,8 +294,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
 
                 IgniteTxKey txKey = ctx.txKey(key);
 
-                Collection<GridCacheMvccCandidate> cands = req.candidatesByIndex(i);
-
                 if (log.isDebugEnabled())
                     log.debug("Unmarshalled key: " + key);
 
@@ -356,8 +355,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                                 req.owned(entry.key())
                             );
 
-                            assert cands.isEmpty() : "Received non-empty candidates in dht lock request: " + cands;
-
                             if (!req.inTx())
                                 ctx.evicts().touch(entry, req.topologyVersion());
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 31aa8c3..9c022b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -332,6 +333,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     /**
      * Initializes future.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     void finish() {
         if (tx.onNeedCheckBackup()) {
             assert tx.onePhaseCommit();
@@ -363,10 +365,18 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 if (!isSync() && !isDone()) {
                     boolean complete = true;
 
-                    for (IgniteInternalFuture<?> f : pending())
-                        // Mini-future in non-sync mode gets done when message gets sent.
-                        if (isMini(f) && !f.isDone())
-                            complete = false;
+                    synchronized (futs) {
+                        // Avoid collection copy and iterator creation.
+                        for (int i = 0; i < futs.size(); i++) {
+                            IgniteInternalFuture<IgniteInternalTx> f = futs.get(i);
+
+                            if (isMini(f) && !f.isDone()) {
+                                complete = false;
+
+                                break;
+                            }
+                        }
+                    }
 
                     if (complete)
                         onComplete();

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 434b6c7..b92be31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -461,21 +461,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /**
-     * @param nodeId Node ID.
-     * @param dhtVer DHT version.
-     * @param writeVer Write version.
-     */
-    void addDhtVersion(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion writeVer) {
-        // This step is very important as near and DHT versions grow separately.
-        cctx.versions().onReceived(nodeId, dhtVer);
-
-        GridDistributedTxMapping m = mappings.get(nodeId);
-
-        if (m != null)
-            m.dhtVersion(dhtVer, writeVer);
-    }
-
-    /**
      * @param nodeId Undo mapping.
      */
     @Override public boolean removeMapping(UUID nodeId) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 45477a0..cfaadc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -172,6 +172,8 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<
         assert res.error() == null : res;
         assert F.isEmpty(res.invalidPartitions()) : res;
 
+        UUID nodeId = m.node().id();
+
         for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
             IgniteTxEntry txEntry = tx.entry(entry.getKey());
 
@@ -187,7 +189,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<
                         CacheVersionedValue tup = entry.getValue();
 
                         nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
-                            tup.version(), m.node().id(), tx.topologyVersion());
+                            tup.version(), nodeId, tx.topologyVersion());
                     }
                     else if (txEntry.cached().detached()) {
                         GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
@@ -229,11 +231,17 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<
             if (writeVer == null)
                 writeVer = res.dhtVersion();
 
-            // Register DHT version.
-            tx.addDhtVersion(m.node().id(), res.dhtVersion(), writeVer);
+            // This step is very important as near and DHT versions grow separately.
+            cctx.versions().onReceived(nodeId, res.dhtVersion());
 
+            // Register DHT version.
             m.dhtVersion(res.dhtVersion(), writeVer);
 
+            GridDistributedTxMapping map = tx.mappings().get(nodeId);
+
+            if (map != null)
+                map.dhtVersion(res.dhtVersion(), writeVer);
+
             if (m.near())
                 tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 798635a..9dfdb43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -71,7 +71,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** Implicit single flag. */
     private boolean implicitSingle;
 
-    /** Explicit lock flag. Set to true if at leat one entry was explicitly locked. */
+    /** Explicit lock flag. Set to true if at least one entry was explicitly locked. */
     private boolean explicitLock;
 
     /** Subject ID. */
@@ -282,73 +282,73 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         }
 
         switch (writer.state()) {
-            case 22:
+            case 23:
                 if (!writer.writeBoolean("explicitLock", explicitLock))
                     return false;
 
                 writer.incrementState();
 
-            case 23:
+            case 24:
                 if (!writer.writeBoolean("firstClientReq", firstClientReq))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 25:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 26:
                 if (!writer.writeBoolean("implicitSingle", implicitSingle))
                     return false;
 
                 writer.incrementState();
 
-            case 26:
+            case 27:
                 if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
-            case 27:
+            case 28:
                 if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
-            case 28:
+            case 29:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 29:
+            case 30:
                 if (!writer.writeBoolean("near", near))
                     return false;
 
                 writer.incrementState();
 
-            case 30:
+            case 31:
                 if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
-            case 31:
+            case 32:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 32:
+            case 33:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 33:
+            case 34:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -370,7 +370,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
             return false;
 
         switch (reader.state()) {
-            case 22:
+            case 23:
                 explicitLock = reader.readBoolean("explicitLock");
 
                 if (!reader.isLastRead())
@@ -378,7 +378,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 firstClientReq = reader.readBoolean("firstClientReq");
 
                 if (!reader.isLastRead())
@@ -386,7 +386,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -394,7 +394,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 25:
+            case 26:
                 implicitSingle = reader.readBoolean("implicitSingle");
 
                 if (!reader.isLastRead())
@@ -402,7 +402,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 26:
+            case 27:
                 last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
@@ -410,7 +410,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 27:
+            case 28:
                 lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
@@ -418,7 +418,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 28:
+            case 29:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -426,7 +426,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 29:
+            case 30:
                 near = reader.readBoolean("near");
 
                 if (!reader.isLastRead())
@@ -434,7 +434,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 30:
+            case 31:
                 retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
@@ -442,7 +442,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 31:
+            case 32:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -450,7 +450,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 32:
+            case 33:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -458,7 +458,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 33:
+            case 34:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -478,7 +478,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 34;
+        return 35;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index f5f99f5..eb0db4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteUuid;

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 6a0f8ab..3ddd909 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple;

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 9eb2808..23f83be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -599,20 +599,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      */
     @SuppressWarnings("unchecked")
     public CacheObject applyEntryProcessors(CacheObject cacheVal) {
-        Object val = null;
-        Object keyVal = null;
-
         GridCacheVersion ver;
 
         try {
             ver = entry.version();
         }
-        catch (GridCacheEntryRemovedException e) {
+        catch (GridCacheEntryRemovedException ignore) {
             assert tx == null || tx.optimistic() : tx;
 
             ver = null;
         }
 
+        Object val = null;
+        Object keyVal = null;
+
         for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) {
             try {
                 CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val,
@@ -1078,5 +1078,4 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     @Override public String toString() {
         return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null ? "null" : tx.xidVersion());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 5f48469..c75a8f38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -147,6 +147,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
+        GridCacheContext ctx0 = cacheCtx;
+
+        return ctx0 != null && ctx0.config().getInterceptor() != null;
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
         if (cacheCtx == null)
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 758f82c..9e44b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -74,7 +75,6 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.C2;
@@ -358,6 +358,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @return {@code True} if transaction participates in a cache that has an interceptor configured.
+     */
+    public boolean hasInterceptor() {
+        return txState().hasInterceptor(cctx);
+    }
+
+    /**
      * @param needRetVal Need return value flag.
      */
     public void needReturnValue(boolean needRetVal) {
@@ -3045,7 +3052,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         try {
             Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
 
-            final Collection<KeyCacheObject> enlisted = new ArrayList<>();
+            final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
 
             CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
 
@@ -3434,7 +3441,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      */
     public boolean init() {
         return !txState.init(txSize) || cctx.tm().onStarted(this);
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ccccca0..67bca51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgnitePair;

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index e7c4c96..3e5034b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -92,6 +92,11 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 81707ba..18fce8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -101,6 +101,12 @@ public interface IgniteTxState {
 
     /**
      * @param cctx Context.
+     * @return {@code True} if transaction spans one or more caches with configured interceptor.
+     */
+    public boolean hasInterceptor(GridCacheSharedContext cctx);
+
+    /**
+     * @param cctx Context.
      * @return Configured stores for active caches.
      */
     public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index c95fb19..213c5e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -24,11 +24,13 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -45,7 +47,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  */
 public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** Active cache IDs. */
-    private Set<Integer> activeCacheIds = new HashSet<>();
+    private GridLongList activeCacheIds = new GridLongList();
     /** Per-transaction read map. */
 
     @GridToStringInclude
@@ -66,13 +68,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public Integer firstCacheId() {
-        return F.first(activeCacheIds);
+        return activeCacheIds.isEmpty() ? null : (int)activeCacheIds.get(0);
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
         if (activeCacheIds.size() == 1) {
-            int cacheId = F.first(activeCacheIds);
+            int cacheId = (int)activeCacheIds.get(0);
 
             return cctx.cacheContext(cacheId);
         }
@@ -82,8 +84,11 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public void awaitLastFut(GridCacheSharedContext cctx) {
-        for (Integer cacheId : activeCacheIds)
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             cctx.cacheContext(cacheId).cache().awaitLastFut();
+        }
     }
 
     /** {@inheritDoc} */
@@ -91,7 +96,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         GridDhtTopologyFuture topFut) {
         StringBuilder invalidCaches = null;
 
-        for (Integer cacheId : activeCacheIds) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             GridCacheContext ctx = cctx.cacheContext(cacheId);
 
             assert ctx != null : cacheId;
@@ -113,7 +120,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
                 invalidCaches.toString());
         }
 
-        for (int cacheId : activeCacheIds) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
             if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) {
@@ -127,7 +136,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public boolean sync(GridCacheSharedContext cctx) {
-        for (int cacheId : activeCacheIds) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC)
                 return true;
         }
@@ -137,7 +148,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
-        for (Integer cacheId : activeCacheIds) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
             if (cacheCtx.isNear())
@@ -163,7 +176,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
                 int idx = 0;
 
-                for (Integer activeCacheId : activeCacheIds) {
+                for (int i = 0; i < activeCacheIds.size(); i++) {
+                    int activeCacheId = (int)activeCacheIds.get(i);
+
                     cacheNames.append(cctx.cacheContext(activeCacheId).name());
 
                     if (idx++ < activeCacheIds.size() - 1)
@@ -192,7 +207,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
         GridCacheContext<?, ?> nonLocCtx = null;
 
-        for (int cacheId : activeCacheIds) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
             if (!cacheCtx.isLocal()) {
@@ -222,7 +239,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         if (!activeCacheIds.isEmpty()) {
             GridCacheContext<?, ?> nonLocCtx = null;
 
-            for (int cacheId : activeCacheIds) {
+            for (int i = 0; i < activeCacheIds.size(); i++) {
+                int cacheId = (int)activeCacheIds.get(i);
+
                 GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
                 if (!cacheCtx.isLocal()) {
@@ -240,7 +259,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** {@inheritDoc} */
     @Override public boolean storeUsed(GridCacheSharedContext cctx) {
         if (!activeCacheIds.isEmpty()) {
-            for (int cacheId : activeCacheIds) {
+            for (int i = 0; i < activeCacheIds.size(); i++) {
+                int cacheId = (int)activeCacheIds.get(i);
+
                 CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
                 if (store.configured())
@@ -252,13 +273,29 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
+            CacheInterceptor interceptor = cctx.cacheContext(cacheId).config().getInterceptor();
+
+            if (interceptor != null)
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
-        Collection<Integer> cacheIds = activeCacheIds;
+        GridLongList cacheIds = activeCacheIds;
 
         if (!cacheIds.isEmpty()) {
             Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
 
-            for (int cacheId : cacheIds) {
+            for (int i = 0; i < cacheIds.size(); i++) {
+                int cacheId = (int)cacheIds.get(i);
+
                 CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
                 if (store.configured())
@@ -273,7 +310,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
-        for (int cacheId : activeCacheIds) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
             onTxEnd(cacheCtx, tx, commit);

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 21d3fb6..b5c89cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -71,6 +72,9 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
     private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist =
         new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY);
 
+    /** Last recorded. */
+    private volatile T2<GridClockDeltaVersion, GridClockDeltaSnapshot> lastSnapshot;
+
     /** Time source. */
     private GridClockSource clockSrc;
 
@@ -99,7 +103,11 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
 
                 GridClockDeltaVersion ver = msg0.snapshotVersion();
 
-                timeSyncHist.put(ver, new GridClockDeltaSnapshot(ver, msg0.deltas()));
+                GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, msg0.deltas());
+
+                lastSnapshot = new T2<>(ver, snap);
+
+                timeSyncHist.put(ver, snap);
             }
         });
 
@@ -265,11 +273,19 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
      * @return Adjusted time.
      */
     public long adjustedTime(long topVer) {
-        // Get last synchronized time on given topology version.
-        Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
-            new GridClockDeltaVersion(0, topVer + 1));
+        T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
+
+        GridClockDeltaSnapshot snap;
 
-        GridClockDeltaSnapshot snap = entry == null ? null : entry.getValue();
+        if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
+            snap = fastSnap.get2();
+        else {
+            // Get last synchronized time on given topology version.
+            Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
+                new GridClockDeltaVersion(0, topVer + 1));
+
+            snap = entry == null ? null : entry.getValue();
+        }
 
         long now = clockSrc.currentTimeMillis();
 
@@ -295,6 +311,8 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
             return;
 
         try {
+            lastSnapshot = new T2<>(snapshot.version(), snapshot);
+
             timeSyncHist.put(snapshot.version(), snapshot);
 
             for (ClusterNode n : top.topologyNodes()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java
new file mode 100644
index 0000000..25e3376
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class UUIDCollectionMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridDirectCollection(UUID.class)
+    private Collection<UUID> uuids;
+
+    /**
+     * Empty constructor required for direct marshalling.
+     */
+    public UUIDCollectionMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param uuids UUIDs to wrap.
+     */
+    public UUIDCollectionMessage(Collection<UUID> uuids) {
+        this.uuids = uuids;
+    }
+
+    /**
+     * @return The collection of UUIDs that was wrapped.
+     */
+    public Collection<UUID> uuids() {
+        return uuids;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("uuids", uuids, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                uuids = reader.readCollection("uuids", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(UUIDCollectionMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 115;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 31674f1..5f0d411 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -53,7 +53,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
         AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
 
     /** Futures. */
-    private final Collection<IgniteInternalFuture<T>> futs = new ArrayList<>();
+    protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>();
 
     /** Reducer. */
     @GridToStringInclude
@@ -166,8 +166,19 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
      *
      * @return {@code True} if there are pending futures.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public boolean hasPending() {
-        return !pending().isEmpty();
+        synchronized (futs) {
+            // Avoid iterator creation and collection copy.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<T> fut = futs.get(i);
+
+                if (!fut.isDone())
+                    return true;
+            }
+        }
+
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index c1d91a8..8d5a8e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1866,6 +1866,8 @@ public class GridFunc {
 
         assert m != null;
 
+        final boolean hasPred = p != null && p.length > 0;
+
         return new GridSerializableMap<K, V1>() {
             /** Entry predicate. */
             private IgnitePredicate<Entry<K, V>> ep = new P1<Map.Entry<K, V>>() {
@@ -1911,7 +1913,7 @@ public class GridFunc {
                     }
 
                     @Override public int size() {
-                        return F.size(m.keySet(), p);
+                        return hasPred ? F.size(m.keySet(), p) : m.size();
                     }
 
                     @SuppressWarnings({"unchecked"})
@@ -1925,13 +1927,13 @@ public class GridFunc {
                     }
 
                     @Override public boolean isEmpty() {
-                        return !iterator().hasNext();
+                        return hasPred ? !iterator().hasNext() : m.isEmpty();
                     }
                 };
             }
 
             @Override public boolean isEmpty() {
-                return entrySet().isEmpty();
+                return hasPred ? entrySet().isEmpty() : m.isEmpty();
             }
 
             @SuppressWarnings({"unchecked"})

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 1824339..5bd08e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1301,6 +1301,8 @@ public class GridNioServer<T> {
         @SuppressWarnings("unchecked")
         private void bodyInternal() throws IgniteCheckedException {
             try {
+                long lastIdleCheck = U.currentTimeMillis();
+
                 while (!closed && selector.isOpen()) {
                     NioOperationFuture req;
 
@@ -1374,11 +1376,18 @@ public class GridNioServer<T> {
                     }
 
                     // Wake up every 2 seconds to check if closed.
-                    if (selector.select(2000) > 0)
+                    if (selector.select(2000) > 0) {
                         // Walk through the ready keys collection and process network events.
                         processSelectedKeys(selector.selectedKeys());
+                    }
 
-                    checkIdle(selector.keys());
+                    long now = U.currentTimeMillis();
+
+                    if (now - lastIdleCheck > 2000) {
+                        lastIdleCheck = now;
+
+                        checkIdle(selector.keys());
+                    }
                 }
             }
             // Ignore this exception as thread interruption is equal to 'close' call.

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
index 8957c5d..6424b8b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
@@ -285,7 +285,7 @@ public class IgniteCacheTxStoreSessionTest extends IgniteCacheStoreSessionAbstra
 
             expData.add(new ExpectedData(true, "write", new HashMap<>(), CACHE_NAME1));
             expData.add(new ExpectedData(true, "write", F.<Object, Object>asMap(0, "write"), null));
-            expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "write", 1, "write"), null));
+            expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "write", 1, "write"), CACHE_NAME1));
 
             tx.commit();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index c3c3659..ea13cdd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest;
@@ -138,6 +139,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
 
         suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class);
         suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class);
         suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
 
         // Test for byte array value special case.


[2/5] ignite git commit: Performance optimizations - reviewed by Yakov.

Posted by sb...@apache.org.
Performance optimizations - reviewed by Yakov.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/175b7f24
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/175b7f24
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/175b7f24

Branch: refs/heads/ignite-single-op-get
Commit: 175b7f24e1d62a90e7a7159ad670036216e6d278
Parents: 4c9ea58
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Nov 18 19:20:45 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 18 19:20:45 2015 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |  11 +-
 .../communication/GridIoMessageFactory.java     |   6 +
 .../discovery/GridDiscoveryManager.java         |   2 +-
 .../cache/GridCacheDeploymentManager.java       |   2 +-
 .../processors/cache/GridCacheGateway.java      |   1 -
 .../processors/cache/GridCacheMvcc.java         |   7 --
 .../processors/cache/GridCacheMvccManager.java  |  42 -------
 .../GridCachePartitionExchangeManager.java      |  55 ++++++++-
 .../cache/GridCacheSharedContext.java           |   7 +-
 .../distributed/GridCacheTxRecoveryFuture.java  |  41 +++++--
 .../distributed/GridDistributedBaseMessage.java |  56 ---------
 .../distributed/GridDistributedLockRequest.java |   6 -
 .../GridDistributedLockResponse.java            |  32 +-----
 .../GridDistributedTxPrepareRequest.java        |  67 +++++++++--
 .../distributed/dht/GridDhtLockFuture.java      |  63 ++++++----
 .../distributed/dht/GridDhtLockRequest.java     |   2 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   5 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   9 ++
 .../distributed/dht/GridDhtTxPrepareFuture.java |  60 ++++++----
 .../dht/GridDhtTxPrepareRequest.java            |  54 ++++-----
 .../dht/colocated/GridDhtColocatedCache.java    |   2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  55 ++++++---
 .../distributed/near/GridNearLockFuture.java    |  56 ++++++---
 .../distributed/near/GridNearLockRequest.java   |   4 +-
 ...arOptimisticSerializableTxPrepareFuture.java |  91 +++++++++------
 .../near/GridNearOptimisticTxPrepareFuture.java |  50 +++++---
 .../GridNearPessimisticTxPrepareFuture.java     |  39 +++++--
 .../near/GridNearTransactionalCache.java        |   7 +-
 .../near/GridNearTxFinishFuture.java            |  18 ++-
 .../cache/distributed/near/GridNearTxLocal.java |  15 ---
 .../near/GridNearTxPrepareFutureAdapter.java    |  14 ++-
 .../near/GridNearTxPrepareRequest.java          |  52 ++++-----
 .../cache/transactions/IgniteInternalTx.java    |   1 +
 .../cache/transactions/IgniteTxAdapter.java     |   1 +
 .../cache/transactions/IgniteTxEntry.java       |   9 +-
 .../IgniteTxImplicitSingleStateImpl.java        |   7 ++
 .../transactions/IgniteTxLocalAdapter.java      |  12 +-
 .../cache/transactions/IgniteTxManager.java     |   1 +
 .../IgniteTxRemoteStateAdapter.java             |   5 +
 .../cache/transactions/IgniteTxState.java       |   6 +
 .../cache/transactions/IgniteTxStateImpl.java   |  69 ++++++++---
 .../clock/GridClockSyncProcessor.java           |  28 ++++-
 .../internal/util/UUIDCollectionMessage.java    | 114 +++++++++++++++++++
 .../util/future/GridCompoundFuture.java         |  15 ++-
 .../ignite/internal/util/lang/GridFunc.java     |   8 +-
 .../ignite/internal/util/nio/GridNioServer.java |  13 ++-
 .../IgniteCacheTxStoreSessionTest.java          |   2 +-
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 48 files changed, 805 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 8d9a3f5..74c71c4 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -43,6 +43,10 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -165,7 +169,12 @@ public class MessageCodeGenerator {
 
         MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
 
-        gen.generateAll(true);
+        gen.generateAndWrite(GridDistributedTxPrepareRequest.class);
+        gen.generateAndWrite(GridDhtTxPrepareRequest.class);
+        gen.generateAndWrite(GridNearTxPrepareRequest.class);
+        gen.generateAndWrite(UUIDCollectionMessage.class);
+
+//        gen.generateAll(true);
 
 //        gen.generateAndWrite(DataStreamerEntry.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ae8c753..2503eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -121,6 +121,7 @@ import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRe
 import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
 import org.apache.ignite.internal.util.GridByteArrayList;
 import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -690,6 +691,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 115:
+                msg = new UUIDCollectionMessage();
+
+                break;
+
             // [-3..114] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index cd2f49c..4880338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2136,7 +2136,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         customEvt.node(ctx.discovery().localNode());
                         customEvt.eventNode(node);
                         customEvt.type(type);
-                        customEvt.topologySnapshot(topVer.topologyVersion(), null);
+                        customEvt.topologySnapshot(topVer.topologyVersion(), evt.get4());
                         customEvt.affinityTopologyVersion(topVer);
                         customEvt.customMessage(evt.get5());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index 40c5b0f..35e8b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -164,7 +164,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
      * Callback on method enter.
      */
     public void onEnter() {
-        if (!locDepOwner && depEnabled && !ignoreOwnership.get()
+        if (depEnabled && !locDepOwner && !ignoreOwnership.get()
             && !cctx.kernalContext().job().internal()) {
             ClassLoader ldr = Thread.currentThread().getContextClassLoader();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 0eac5ba..1562d70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -109,7 +109,6 @@ public class GridCacheGateway<K, V> {
         rwLock.readLock();
 
         return checkState(true, false);
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 12583ad..adcbf92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -348,9 +348,6 @@ public final class GridCacheMvcc {
 
                     reassign();
 
-                    if (cand.local())
-                        cctx.mvcc().removeLocal(cand);
-
                     return true;
                 }
             }
@@ -596,8 +593,6 @@ public final class GridCacheMvcc {
         );
 
         if (serOrder == null) {
-            cctx.mvcc().addLocal(cand);
-
             boolean add = add0(cand);
 
             assert add : cand;
@@ -605,8 +600,6 @@ public final class GridCacheMvcc {
         else {
             if (!add0(cand))
                 return null;
-
-            cctx.mvcc().addLocal(cand);
         }
 
         return cand;

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 2c14209..8562f37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -92,9 +91,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks =
         new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q);
 
-    /** Current local candidates. */
-    private Collection<GridCacheMvccCandidate> dhtLocCands = new ConcurrentSkipListSet<>();
-
     /** Locked keys. */
     @GridToStringExclude
     private final ConcurrentMap<IgniteTxKey, GridDistributedCacheEntry> locked = newMap();
@@ -707,43 +703,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param cand Local lock.
-     * @return {@code True} if added.
-     */
-    public boolean addLocal(GridCacheMvccCandidate cand) {
-        assert cand.key() != null : cand;
-        assert cand.local() : cand;
-
-        if (cand.dhtLocal() && dhtLocCands.add(cand)) {
-            if (log.isDebugEnabled())
-                log.debug("Added local candidate: " + cand);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     *
-     * @param cand Local candidate to remove.
-     * @return {@code True} if removed.
-     */
-    public boolean removeLocal(GridCacheMvccCandidate cand) {
-        assert cand.key() != null : cand;
-        assert cand.local() : cand;
-
-        if (cand.dhtLocal() && dhtLocCands.remove(cand)) {
-            if (log.isDebugEnabled())
-                log.debug("Removed local candidate: " + cand);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
      * @param cacheCtx Cache context.
      * @param cand Cache lock candidate to add.
      * @return {@code True} if added as a result of this operation,
@@ -953,7 +912,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         X.println(">>> ");
         X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
         X.println(">>>   rmvLocksSize: " + rmvLocks.sizex());
-        X.println(">>>   dhtLocCandsSize: " + dhtLocCands.size());
         X.println(">>>   lockedSize: " + locked.size());
         X.println(">>>   futsSize: " + futs.size());
         X.println(">>>   near2dhtSize: " + near2dht.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 81ff028..e19b310 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -77,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -134,6 +137,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
 
     /** */
+    private final ConcurrentSkipListMap<AffinityTopologyVersion, IgnitePair<IgniteProductVersion>> nodeVers =
+        new ConcurrentSkipListMap<>();
+
+    /** */
     private final AtomicReference<AffinityTopologyVersion> readyTopVer =
         new AtomicReference<>(AffinityTopologyVersion.NONE);
 
@@ -572,6 +579,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Gets minimum node version for the given topology version.
+     *
+     * @param topVer Topology version to get minimum node version for.
+     * @return Minimum node version.
+     */
+    public IgniteProductVersion minimumNodeVersion(AffinityTopologyVersion topVer) {
+        IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+        return vers == null ? cctx.localNode().version() : vers.get1();
+    }
+
+    /**
+     * Gets maximum node version for the given topology version.
+     *
+     * @param topVer Topology version to get maximum node version for.
+     * @return Maximum node version.
+     */
+    public IgniteProductVersion maximumNodeVersion(AffinityTopologyVersion topVer) {
+        IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+        return vers == null ? cctx.localNode().version() : vers.get2();
+    }
+
+    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -832,6 +863,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
 
+        IgniteProductVersion minVer = cctx.localNode().version();
+        IgniteProductVersion maxVer = cctx.localNode().version();
+
+        if (err == null) {
+            if (!F.isEmpty(exchFut.discoveryEvent().topologyNodes())) {
+                for (ClusterNode node : exchFut.discoveryEvent().topologyNodes()) {
+                    IgniteProductVersion ver = node.version();
+
+                    if (ver.compareTo(minVer) < 0)
+                        minVer = ver;
+
+                    if (ver.compareTo(maxVer) > 0)
+                        maxVer = ver;
+                }
+            }
+        }
+
+        nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer));
+
+        for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet())
+            nodeVers.remove(oldVer);
+
         if (err == null) {
             while (true) {
                 AffinityTopologyVersion readyVer = readyTopVer.get();
@@ -1050,7 +1103,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             int cnt = 0;
 
-            for (GridDhtPartitionsExchangeFuture fut : exchFuts) {
+            for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
                 U.warn(log, ">>> " + fut);
 
                 if (++cnt == 10)

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 4293b90..608829a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -533,7 +534,7 @@ public class GridCacheSharedContext<K, V> {
      * @param cacheCtx Cache context.
      * @return Error message if transactions are incompatible.
      */
-    @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable<Integer> activeCacheIds,
+    @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridLongList activeCacheIds,
         GridCacheContext<K, V> cacheCtx) {
         if (cacheCtx.systemTx() && !tx.system())
             return "system cache can be enlisted only in system transaction";
@@ -541,7 +542,9 @@ public class GridCacheSharedContext<K, V> {
         if (!cacheCtx.systemTx() && tx.system())
             return "non-system cache can't be enlisted in system transaction";
 
-        for (Integer cacheId : activeCacheIds) {
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int)activeCacheIds.get(i);
+
             GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
 
             if (cacheCtx.systemTx()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index b266c4d..01c4867 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -342,20 +342,45 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
      */
     public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
         if (!isDone()) {
-            for (IgniteInternalFuture<Boolean> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+            MiniFuture mini = miniFuture(res.miniId());
 
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.nodeId().equals(nodeId);
+            if (mini != null) {
+                assert mini.nodeId().equals(nodeId);
 
-                        f.onResult(res);
+                mini.onResult(res);
+            }
+        }
+    }
 
-                        break;
-                    }
+    /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+                if (!isMini(fut))
+                    continue;
+
+                MiniFuture mini = (MiniFuture)fut;
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
                 }
             }
         }
+
+        return null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index f4a16dc..ebbc9ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -21,13 +21,10 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -49,15 +46,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
     @GridToStringInclude
     protected GridCacheVersion ver;
 
-    /**
-     * Candidates for every key ordered in the order of keys. These
-     * can be either local-only candidates in case of lock acquisition,
-     * or pending candidates in case of transaction commit.
-     */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Collection<GridCacheMvccCandidate>[] candsByIdx;
-
     /** */
     @GridToStringExclude
     private byte[] candsByIdxBytes;
@@ -108,23 +96,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
         this.ver = ver;
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
-    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (candsByIdx != null)
-            candsByIdxBytes = ctx.marshaller().marshal(candsByIdx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (candsByIdxBytes != null)
-            candsByIdx = ctx.marshaller().unmarshal(candsByIdxBytes, ldr);
-    }
-
     /** {@inheritDoc} */
     @Override public boolean addDeploymentInfo() {
         return addDepInfo;
@@ -169,33 +140,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
     }
 
     /**
-     * @param idx Key index.
-     * @param candsByIdx List of candidates for that key.
-     */
-    @SuppressWarnings({"unchecked"})
-    public void candidatesByIndex(int idx, Collection<GridCacheMvccCandidate> candsByIdx) {
-        assert idx < cnt;
-
-        // If nothing to add.
-        if (candsByIdx == null || candsByIdx.isEmpty())
-            return;
-
-        if (this.candsByIdx == null)
-            this.candsByIdx = new Collection[cnt];
-
-        this.candsByIdx[idx] = candsByIdx;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Candidates for given key.
-     */
-    public Collection<GridCacheMvccCandidate> candidatesByIndex(int idx) {
-        return candsByIdx == null ||
-            candsByIdx[idx] == null ? Collections.<GridCacheMvccCandidate>emptyList() : candsByIdx[idx];
-    }
-
-    /**
      * @return Count of keys referenced in candidates array (needed only locally for optimization).
      */
     public int keysCount() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 2899e25..b584f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -19,14 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -261,14 +259,12 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
      *
      * @param key Key.
      * @param retVal Flag indicating whether value should be returned.
-     * @param cands Candidates.
      * @param ctx Context.
      * @throws IgniteCheckedException If failed.
      */
     public void addKeyBytes(
         KeyCacheObject key,
         boolean retVal,
-        @Nullable Collection<GridCacheMvccCandidate> cands,
         GridCacheContext ctx
     ) throws IgniteCheckedException {
         if (keys == null)
@@ -276,8 +272,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
 
         keys.add(key);
 
-        candidatesByIndex(idx, cands);
-
         retVals[idx] = retVal;
 
         idx++;

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index cdd58b5..bb3f9ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -26,7 +26,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -156,34 +155,11 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
     }
 
     /**
-     * @param idx Index of locked flag.
-     * @return Value of locked flag at given index.
-     */
-    public boolean isCurrentlyLocked(int idx) {
-        assert idx >= 0;
-
-        Collection<GridCacheMvccCandidate> cands = candidatesByIndex(idx);
-
-        for (GridCacheMvccCandidate cand : cands)
-            if (cand.owner())
-                return true;
-
-        return false;
-    }
-
-    /**
-     * @param idx Candidates index.
-     * @param cands Collection of candidates.
      * @param committedVers Committed versions relative to lock version.
      * @param rolledbackVers Rolled back versions relative to lock version.
      */
-    public void setCandidates(int idx, Collection<GridCacheMvccCandidate> cands,
-        Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
-        assert idx >= 0;
-
+    public void setCandidates(Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
         completedVersions(committedVers, rolledbackVers);
-
-        candidatesByIndex(idx, cands);
     }
 
     /**
@@ -218,9 +194,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
 
         prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId));
 
-//        if (F.isEmpty(valBytes) && !F.isEmpty(vals))
-//            valBytes = marshalValuesCollection(vals, ctx);
-
         if (err != null)
             errBytes = ctx.marshaller().marshal(err);
     }
@@ -231,9 +204,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
 
         finishUnmarshalCacheObjects(vals, ctx.cacheContext(cacheId), ldr);
 
-//        if (F.isEmpty(vals) && !F.isEmpty(valBytes))
-//            vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
-
         if (errBytes != null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 533c8ca..95176ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -34,9 +35,13 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -52,6 +57,23 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Version in which direct marshalling of tx nodes was introduced. */
+    public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+    /** Collection to message converter. */
+    public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
+        @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
+            return new UUIDCollectionMessage(uuids);
+        }
+    };
+
+    /** Message to collection converter. */
+    public static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() {
+        @Override public Collection<UUID> apply(UUIDCollectionMessage msg) {
+            return msg.uuids();
+        }
+    };
+
     /** Thread ID. */
     @GridToStringInclude
     private long threadId;
@@ -106,6 +128,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     @GridDirectTransient
     private Map<UUID, Collection<UUID>> txNodes;
 
+    /** Tx nodes direct marshallable message. */
+    @GridDirectMap(keyType = UUID.class, valueType = UUIDCollectionMessage.class)
+    private Map<UUID, UUIDCollectionMessage> txNodesMsg;
+
     /** */
     private byte[] txNodesBytes;
 
@@ -302,8 +328,16 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
             dhtVerVals = dhtVers.values();
         }
 
-        if (txNodes != null)
-            txNodesBytes = ctx.marshaller().marshal(txNodes);
+        // Marshal txNodes only if there is a node in topology with an older version.
+        if (ctx.exchange().minimumNodeVersion(topologyVersion())
+            .compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) {
+            if (txNodes != null && txNodesBytes == null)
+                txNodesBytes = ctx.marshaller().marshal(txNodes);
+        }
+        else {
+            if (txNodesMsg == null)
+                txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG);
+        }
     }
 
     /** {@inheritDoc} */
@@ -334,7 +368,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
             }
         }
 
-        if (txNodesBytes != null)
+        if (txNodesMsg != null)
+            txNodes = F.viewReadOnly(txNodesMsg, MSG_TO_COL);
+
+        if (txNodesBytes != null && txNodes == null)
             txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
     }
 
@@ -431,18 +468,24 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("txSize", txSize))
+                if (!writer.writeMap("txNodesMsg", txNodesMsg, MessageCollectionItemType.UUID, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("writeVer", writeVer))
+                if (!writer.writeInt("txSize", txSize))
                     return false;
 
                 writer.incrementState();
 
             case 21:
+                if (!writer.writeMessage("writeVer", writeVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 22:
                 if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG))
                     return false;
 
@@ -569,7 +612,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 reader.incrementState();
 
             case 19:
-                txSize = reader.readInt("txSize");
+                txNodesMsg = reader.readMap("txNodesMsg", MessageCollectionItemType.UUID, MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -577,7 +620,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 reader.incrementState();
 
             case 20:
-                writeVer = reader.readMessage("writeVer");
+                txSize = reader.readInt("txSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -585,6 +628,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
                 reader.incrementState();
 
             case 21:
+                writeVer = reader.readMessage("writeVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 22:
                 writes = reader.readCollection("writes", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -604,7 +655,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 22;
+        return 23;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 579d701..7284fd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -380,10 +380,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      * @return Lock candidate.
      * @throws GridCacheEntryRemovedException If entry was removed.
      * @throws GridDistributedLockCancelledException If lock is canceled.
-     * @throws IgniteCheckedException If failed.
      */
     @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry)
-        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException {
+        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
         if (log.isDebugEnabled())
             log.debug("Adding entry: " + entry);
 
@@ -529,35 +528,57 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             if (log.isDebugEnabled())
                 log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']');
 
-            boolean found = false;
+            MiniFuture mini = miniFuture(res.miniId());
 
-            for (IgniteInternalFuture<Boolean> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture mini = (MiniFuture)fut;
+            if (mini != null) {
+                assert mini.node().id().equals(nodeId);
 
-                    if (mini.futureId().equals(res.miniId())) {
-                        assert mini.node().id().equals(nodeId);
+                if (log.isDebugEnabled())
+                    log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
 
-                        if (log.isDebugEnabled())
-                            log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
+                mini.onResult(res);
 
-                        found = true;
+                if (log.isDebugEnabled())
+                    log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini +
+                        ", res=" + res + ']');
 
-                        mini.onResult(res);
+                return;
+            }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini +
-                                ", res=" + res + ']');
+            U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
+                ", fut=" + this + ']');
+        }
+    }
 
-                        break;
-                    }
+    /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+                if (!isMini(fut))
+                    continue;
+
+                MiniFuture mini = (MiniFuture)fut;
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
                 }
             }
-
-            if (!found)
-                U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
-                    ", fut=" + this + ']');
         }
+
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 91ab1ca..18281d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -236,7 +236,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
     ) throws IgniteCheckedException {
         invalidateEntries.set(idx, invalidateEntry);
 
-        addKeyBytes(key, false, null, ctx);
+        addKeyBytes(key, false, ctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index fe91e5b..35f63e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -187,8 +186,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
             IgniteTxKey txKey = ctx.txKey(key);
 
-            assert F.isEmpty(req.candidatesByIndex(i));
-
             if (log.isDebugEnabled())
                 log.debug("Unmarshalled key: " + key);
 
@@ -671,7 +668,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry when adding lock (will retry): " + entry);
                     }
-                    catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+                    catch (GridDistributedLockCancelledException e) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 70ebf3f..55ca12d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -181,6 +181,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /**
+     * Gets flag that indicates that originating node has a near cache that participates in this transaction.
+     *
+     * @return Has near cache flag.
+     */
+    public boolean nearOnOriginatingNode() {
+        return nearOnOriginatingNode;
+    }
+
+    /**
      * @return {@code True} if explicit lock transaction.
      */
     public boolean explicitLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a67950d..d081c0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -450,20 +450,45 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      */
     public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) {
         if (!isDone()) {
-            for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+            MiniFuture mini = miniFuture(res.miniId());
 
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.node().id().equals(nodeId);
+            if (mini != null) {
+                assert mini.node().id().equals(nodeId);
 
-                        f.onResult(res);
+                mini.onResult(res);
+            }
+        }
+    }
 
-                        break;
-                    }
+    /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i);
+
+                if (!isMini(fut))
+                    continue;
+
+                MiniFuture mini = (MiniFuture)fut;
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
                 }
             }
         }
+
+        return null;
     }
 
     /**
@@ -693,7 +718,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             tx.activeCachesDeploymentEnabled());
 
         if (prepErr == null) {
-            addDhtValues(res);
+            if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor())
+                addDhtValues(res);
 
             GridCacheVersion min = tx.minVersion();
 
@@ -949,7 +975,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
             }
         }
-        catch (GridCacheEntryRemovedException e) {
+        catch (GridCacheEntryRemovedException ignore) {
             assert false : "Got removed exception on entry with dht local candidate: " + entries;
         }
 
@@ -1072,18 +1098,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                             GridCacheContext<?, ?> cacheCtx = cached.context();
 
-                            if (entry.explicitVersion() == null) {
-                                GridCacheMvccCandidate added = cached.candidate(version());
-
-                                assert added != null : "Null candidate for non-group-lock entry " +
-                                    "[added=" + added + ", entry=" + entry + ']';
-                                assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
-                                    "[added=" + added + ", entry=" + entry + ']';
-
-                                if (added != null && added.ownerVersion() != null)
-                                    req.owned(entry.txKey(), added.ownerVersion());
-                            }
-
                             // Do not invalidate near entry on originating transaction node.
                             req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
                                 cached.readerId(n.id()) != null);
@@ -1092,7 +1106,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                 List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
                                     tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
 
-                                // Do not preload if local node is partition owner.
+                                // Do not preload if local node is a partition owner.
                                 if (!owners.contains(cctx.localNode()))
                                     req.markKeyForPreload(idx);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index fcd66c2..394ff89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -345,79 +345,79 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         }
 
         switch (writer.state()) {
-            case 22:
+            case 23:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 23:
+            case 24:
                 if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 25:
                 if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 26:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 26:
+            case 27:
                 if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
-            case 27:
+            case 28:
                 if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 28:
+            case 29:
                 if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
-            case 29:
+            case 30:
                 if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 30:
+            case 31:
                 if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 31:
+            case 32:
                 if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
-            case 32:
+            case 33:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 33:
+            case 34:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 34:
+            case 35:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -439,7 +439,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             return false;
 
         switch (reader.state()) {
-            case 22:
+            case 23:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -447,7 +447,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
 
                 if (!reader.isLastRead())
@@ -455,7 +455,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
@@ -463,7 +463,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 25:
+            case 26:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -471,7 +471,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 26:
+            case 27:
                 nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
@@ -479,7 +479,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 27:
+            case 28:
                 nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -487,7 +487,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 28:
+            case 29:
                 nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
@@ -495,7 +495,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 29:
+            case 30:
                 ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -503,7 +503,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 30:
+            case 31:
                 ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -511,7 +511,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 31:
+            case 32:
                 preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
@@ -519,7 +519,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 32:
+            case 33:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -527,7 +527,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 33:
+            case 34:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -535,7 +535,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 34:
+            case 35:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -555,6 +555,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 35;
+        return 36;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 83c220d..7131aa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -873,7 +873,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry when adding lock (will retry): " + entry);
                     }
-                    catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+                    catch (GridDistributedLockCancelledException e) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 365b46b..abeb509 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -428,25 +428,21 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                 log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" +
                     this + ']');
 
-            for (IgniteInternalFuture<Boolean> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture mini = (MiniFuture)fut;
+            MiniFuture mini = miniFuture(res.miniId());
 
-                    if (mini.futureId().equals(res.miniId())) {
-                        assert mini.node().id().equals(nodeId);
+            if (mini != null) {
+                assert mini.node().id().equals(nodeId);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
 
-                        mini.onResult(res);
+                mini.onResult(res);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
-                                ", res=" + res + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
+                        ", res=" + res + ']');
 
-                        return;
-                    }
-                }
+                return;
             }
 
             U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
@@ -458,6 +454,37 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     }
 
     /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+                if (!isMini(fut))
+                    continue;
+
+                MiniFuture mini = (MiniFuture)fut;
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * @param t Error.
      */
     private void onError(Throwable t) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index c5b55bd..9c3701f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -478,25 +478,21 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             if (log.isDebugEnabled())
                 log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']');
 
-            for (IgniteInternalFuture<Boolean> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture mini = (MiniFuture)fut;
+            MiniFuture mini = miniFuture(res.miniId());
 
-                    if (mini.futureId().equals(res.miniId())) {
-                        assert mini.node().id().equals(nodeId);
+            if (mini != null) {
+                assert mini.node().id().equals(nodeId);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
 
-                        mini.onResult(res);
+                mini.onResult(res);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
-                                ", res=" + res + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
+                        ", res=" + res + ']');
 
-                        return;
-                    }
-                }
+                return;
             }
 
             U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
@@ -508,6 +504,38 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     }
 
     /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+                if (!isMini(fut))
+                    continue;
+
+                MiniFuture mini = (MiniFuture)fut;
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
+                }
+            }
+        }
+
+        return null;
+    }
+
+
+    /**
      * @param t Error.
      */
     private void onError(Throwable t) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 165da84..805a6a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -20,13 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest;
@@ -300,7 +298,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         dhtVers[idx] = dhtVer;
 
         // Delegate to super.
-        addKeyBytes(key, retVal, (Collection<GridCacheMvccCandidate>)null, ctx);
+        addKeyBytes(key, retVal, ctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 29774a5..1569b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedExceptio
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -210,17 +211,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
-            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+            MiniFuture mini = miniFuture(res.miniId());
 
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.node().id().equals(nodeId);
-
-                        f.onResult(res);
-                    }
-                }
-            }
+            if (mini != null)
+                mini.onResult(res);
         }
     }
 
@@ -239,6 +233,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i);
+
+                if (!isMini(fut))
+                    continue;
+
+                MiniFuture mini = (MiniFuture)fut;
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * @param f Future.
      * @return {@code True} if mini-future.
      */
@@ -276,32 +301,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @param remap Remap flag.
      */
     @Override protected void prepare0(boolean remap, boolean topLocked) {
-        try {
-            boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
-
-            if (!txStateCheck) {
-                if (tx.setRollbackOnly()) {
-                    if (tx.timedOut())
-                        onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
-                            "was rolled back: " + this));
-                    else
-                        onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
-                            "[state=" + tx.state() + ", tx=" + this + ']'));
-                }
-                else
-                    onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
-                        "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+        boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
 
-                return;
+        if (!txStateCheck) {
+            if (tx.setRollbackOnly()) {
+                if (tx.timedOut())
+                    onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+                        "was rolled back: " + this));
+                else
+                    onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
+                        "[state=" + tx.state() + ", tx=" + this + ']'));
             }
+            else
+                onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+                    "prepare [state=" + tx.state() + ", tx=" + this + ']'));
 
-            prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
-
-            markInitialized();
-        }
-        catch (IgniteCheckedException e) {
-            onDone(e);
+            return;
         }
+
+        prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
+
+        markInitialized();
     }
 
     /**
@@ -309,7 +329,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      * @param writes Write entries.
      * @param remap Remap flag.
      * @param topLocked Topology locked flag.
-     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
     private void prepare(
@@ -317,7 +336,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         Iterable<IgniteTxEntry> writes,
         boolean remap,
         boolean topLocked
-    ) throws IgniteCheckedException {
+    ) {
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
         assert topVer.topologyVersion() > 0;
@@ -355,9 +374,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
-            MiniFuture fut = new MiniFuture(m);
-
-            add(fut);
+            add(new MiniFuture(m));
         }
 
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 791d2f3..82e3868 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -187,18 +188,45 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
-            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+            MiniFuture mini = miniFuture(res.miniId());
 
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.node().id().equals(nodeId);
+            if (mini != null) {
+                assert mini.node().id().equals(nodeId);
 
-                        f.onResult(nodeId, res);
-                    }
+                mini.onResult(nodeId, res);
+            }
+        }
+    }
+
+    /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i);
+
+                if (!isMini(fut))
+                    continue;
+
+                MiniFuture mini = (MiniFuture)fut;
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
                 }
             }
         }
+
+        return null;
     }
 
     /** {@inheritDoc} */
@@ -277,10 +305,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             markInitialized();
         }
         catch (TransactionTimeoutException e) {
-            onError( e);
-        }
-        catch (IgniteCheckedException e) {
-            onDone(e);
+            onError(e);
         }
     }
 
@@ -327,12 +352,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     /**
      * @param writes Write entries.
      * @param topLocked {@code True} if thread already acquired lock preventing topology change.
-     * @throws IgniteCheckedException If failed.
      */
     private void prepare(
         Iterable<IgniteTxEntry> writes,
         boolean topLocked
-    ) throws IgniteCheckedException {
+    ) {
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
         assert topVer.topologyVersion() > 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 1554a62..103105e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -103,20 +103,45 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         if (!isDone()) {
             assert res.clientRemapVersion() == null : res;
 
-            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
-                MiniFuture f = (MiniFuture)fut;
+            MiniFuture f = miniFuture(res.miniId());
 
-                if (f.futureId().equals(res.miniId())) {
-                    assert f.node().id().equals(nodeId);
+            if (f != null) {
+                assert f.node().id().equals(nodeId);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f);
+                if (log.isDebugEnabled())
+                    log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f);
 
-                    f.onResult(res);
+                f.onResult(res);
+            }
+        }
+    }
+
+    /**
+     * Finds pending mini future by the given mini ID.
+     *
+     * @param miniId Mini ID to find.
+     * @return Mini future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private MiniFuture miniFuture(IgniteUuid miniId) {
+        // We iterate directly over the futs collection here to avoid copy.
+        synchronized (futs) {
+            // Avoid iterator creation.
+            for (int i = 0; i < futs.size(); i++) {
+                MiniFuture mini = (MiniFuture)futs.get(i);
+
+                if (mini.futureId().equals(miniId)) {
+                    if (!mini.isDone())
+                        return mini;
+                    else
+                        return null;
                 }
             }
         }
+
+        return null;
     }
+
     /** {@inheritDoc} */
     @Override public void prepare() {
         if (!tx.state(PREPARING)) {


[5/5] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-1.5' into ignite-single-op-get

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1.5' into ignite-single-op-get


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a68645a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a68645a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a68645a6

Branch: refs/heads/ignite-single-op-get
Commit: a68645a610d81443e89e9e290c91be955dbdb8ec
Parents: 6c685b8
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 18 23:21:10 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 18 23:21:10 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/cache/GridCacheIoManager.java     | 4 ++--
 .../cache/distributed/near/GridNearSingleGetRequest.java         | 2 +-
 .../cache/distributed/near/GridNearSingleGetResponse.java        | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a68645a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5c3eb3c..9afbca8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -520,7 +520,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
-            case 115: {
+            case 116: {
                 GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
 
                 GridNearSingleGetResponse res = new GridNearSingleGetResponse(
@@ -538,7 +538,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
-            case 116: {
+            case 117: {
                 GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
 
                 GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a68645a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 073df94..a506007 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -381,7 +381,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa
 
     /** {@inheritDoc} */
     @Override public byte directType() {
-        return 115;
+        return 116;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a68645a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index 66b4d9b..ba0081c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -306,7 +306,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
 
     /** {@inheritDoc} */
     @Override public byte directType() {
-        return 116;
+        return 117;
     }
 
     /** {@inheritDoc} */


[3/5] ignite git commit: IGNITE-1790 Implement Apache Camel streamer.

Posted by sb...@apache.org.
IGNITE-1790 Implement Apache Camel streamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c490de38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c490de38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c490de38

Branch: refs/heads/ignite-single-op-get
Commit: c490de38c7b841fe51fec1001368bda096e82100
Parents: 175b7f2
Author: Raul Kripalani <ra...@apache.org>
Authored: Wed Nov 18 18:03:23 2015 +0000
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Nov 18 18:03:23 2015 +0000

----------------------------------------------------------------------
 modules/camel/pom.xml                           | 102 +++++
 .../ignite/stream/camel/CamelStreamer.java      | 237 +++++++++++
 .../stream/camel/IgniteCamelStreamerTest.java   | 420 +++++++++++++++++++
 .../camel/IgniteCamelStreamerTestSuite.java     |  48 +++
 .../src/test/resources/camel.test.properties    |  18 +
 .../org/apache/ignite/stream/StreamAdapter.java |  19 +-
 pom.xml                                         |   1 +
 7 files changed, 835 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/pom.xml
----------------------------------------------------------------------
diff --git a/modules/camel/pom.xml b/modules/camel/pom.xml
new file mode 100644
index 0000000..60f0597
--- /dev/null
+++ b/modules/camel/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-camel</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <properties>
+        <camel.version>2.16.0</camel.version>
+        <guava.version>18.0</guava.version>
+        <okhttp.version>2.5.0</okhttp.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+            <version>${camel.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jetty</artifactId>
+            <version>${camel.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
new file mode 100644
index 0000000..40ed6b3
--- /dev/null
+++ b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+/**
+ * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
+ *
+ * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor (either {@link
+ * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}.
+ *
+ * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a {@link
+ * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+ *
+ * @see <a href="http://camel.apache.org">Apache Camel</a>
+ * @see <a href="http://camel.apache.org/components.html">Apache Camel components</a>
+ */
+public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements Processor {
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** The Camel Context. */
+    private CamelContext camelCtx;
+
+    /** The endpoint URI to consume from. */
+    private String endpointUri;
+
+    /** Camel endpoint. */
+    private Endpoint endpoint;
+
+    /** Camel consumer. */
+    private Consumer consumer;
+
+    /** A {@link Processor} to generate the response. */
+    private Processor resProc;
+
+    /**
+     * Starts the streamer.
+     *
+     * @throws IgniteException In cases when failed to start the streamer.
+     */
+    public void start() throws IgniteException {
+        // Ensure that the endpoint URI is provided.
+        A.notNullOrEmpty(endpointUri, "endpoint URI must be provided");
+
+        // Check that one and only one tuple extractor is provided.
+        A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
+            "tuple extractor missing");
+
+        // If a custom CamelContext is not provided, initialize one.
+        if (camelCtx == null)
+            camelCtx = new DefaultCamelContext();
+
+        // If the Camel Context is starting or started, reject this call to start.
+        if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
+            throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting).");
+
+        log = getIgnite().log();
+
+        // Instantiate the Camel endpoint.
+        try {
+            endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
+        }
+
+        // Create the Camel consumer.
+        try {
+            consumer = endpoint.createConsumer(this);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
+        }
+
+        // Start the Camel services.
+        try {
+            ServiceHelper.startServices(camelCtx, endpoint, consumer);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+
+            try {
+                ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+
+                consumer = null;
+                endpoint = null;
+            }
+            catch (Exception e1) {
+                throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or " +
+                    "consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" +
+                    e1.getMessage() + ']');
+            }
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
+        }
+
+        U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
+    }
+
+    /**
+     * Stops the streamer.
+     *
+     * @throws IgniteException In cases if failed to stop the streamer.
+     */
+    public void stop() throws IgniteException {
+        // If the Camel Context is stopping or stopped, reject this call to stop.
+        if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping)
+            throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping).");
+
+        // Stop Camel services.
+        try {
+            ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+        }
+        catch (Exception e) {
+            throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']');
+        }
+
+        U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
+    }
+
+    /**
+     * Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer.
+     *
+     * @param exchange The Camel Exchange.
+     */
+    @Override public void process(Exchange exchange) throws Exception {
+        // Extract and insert the tuple(s).
+        if (getMultipleTupleExtractor() == null) {
+            Map.Entry<K, V> entry = getSingleTupleExtractor().extract(exchange);
+            getStreamer().addData(entry);
+        }
+        else {
+            Map<K, V> entries = getMultipleTupleExtractor().extract(exchange);
+            getStreamer().addData(entries);
+        }
+
+        // If the user has set a response processor, invoke it before finishing.
+        if (resProc != null)
+            resProc.process(exchange);
+    }
+
+    /**
+     * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified by the
+     * user.
+     *
+     * @return The Camel Context.
+     */
+    public CamelContext getCamelContext() {
+        return camelCtx;
+    }
+
+    /**
+     * Explicitly sets the {@link CamelContext} to use.
+     *
+     * Doing so gives the user the opportunity to attach custom components, a {@link
+     * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+     *
+     * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
+     */
+    public void setCamelContext(CamelContext camelCtx) {
+        this.camelCtx = camelCtx;
+    }
+
+    /**
+     * Gets the endpoint URI from which to consume.
+     *
+     * @return The endpoint URI.
+     */
+    public String getEndpointUri() {
+        return endpointUri;
+    }
+
+    /**
+     * Sets the endpoint URI from which to consume. <b>Mandatory.</b>
+     *
+     * @param endpointUri The endpoint URI.
+     */
+    public void setEndpointUri(String endpointUri) {
+        this.endpointUri = endpointUri;
+    }
+
+    /**
+     * Gets the {@link Processor} used to generate the response.
+     *
+     * @return The {@link Processor}.
+     */
+    public Processor getResponseProcessor() {
+        return resProc;
+    }
+
+    /**
+     * Sets the {@link Processor} used to generate the response.
+     *
+     * @param resProc The {@link Processor}.
+     */
+    public void setResponseProcessor(Processor resProc) {
+        this.resProc = resProc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
new file mode 100644
index 0000000..4795dff
--- /dev/null
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Test class for {@link CamelStreamer}.
+ */
+public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
+    /** text/plain media type. */
+    private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
+
+    /** The test data. */
+    private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+    /** The Camel streamer currently under test. */
+    private CamelStreamer<Integer, String> streamer;
+
+    /** The Ignite data streamer. */
+    private IgniteDataStreamer<Integer, String> dataStreamer;
+
+    /** URL where the REST service will be exposed. */
+    private String url;
+
+    /** The UUID of the currently active remote listener. */
+    private UUID remoteLsnr;
+
+    /** The OkHttpClient. */
+    private OkHttpClient httpClient = new OkHttpClient();
+
+    // Initialize the test data.
+    static {
+        for (int i = 0; i < 100; i++)
+            TEST_DATA.put(i, "v" + i);
+    }
+
+    /** Constructor. */
+    public IgniteCamelStreamerTest() {
+        super(true);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override public void beforeTest() throws Exception {
+        grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+        // find an available local port
+        try (ServerSocket ss = new ServerSocket(0)) {
+            int port = ss.getLocalPort();
+
+            url = "http://localhost:" + port + "/ignite";
+        }
+
+        // create Camel streamer
+        dataStreamer = grid().dataStreamer(null);
+        streamer = createCamelStreamer(dataStreamer);
+    }
+
+    @Override public void afterTest() throws Exception {
+        try {
+            streamer.stop();
+        }
+        catch (Exception e) {
+            // ignore if already stopped
+        }
+
+        dataStreamer.close();
+
+        grid().cache(null).clear();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testSendOneEntryPerMessage() throws Exception {
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        sendMessages(0, 50, false);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testMultipleEntriesInOneMessage() throws Exception {
+        streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        sendMessages(0, 50, true);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testResponseProcessorIsCalled() throws Exception {
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setResponseProcessor(new Processor() {
+            @Override public void process(Exchange exchange) throws Exception {
+                exchange.getOut().setBody("Foo bar");
+            }
+        });
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        List<String> responses = sendMessages(0, 50, false);
+
+        for (String r : responses)
+            assertEquals("Foo bar", r);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testUserSpecifiedCamelContext() throws Exception {
+        final AtomicInteger cnt = new AtomicInteger();
+
+        // Create a CamelContext with a probe that'll help us know if it has been used.
+        CamelContext context = new DefaultCamelContext();
+        context.setTracing(true);
+        context.addLifecycleStrategy(new LifecycleStrategySupport() {
+            @Override public void onEndpointAdd(Endpoint endpoint) {
+                cnt.incrementAndGet();
+            }
+        });
+
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setCamelContext(context);
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Send messages.
+        sendMessages(0, 50, false);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+        assertTrue(cnt.get() > 0);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception {
+        // Create a CamelContext with a custom property placeholder.
+        CamelContext context = new DefaultCamelContext();
+
+        PropertiesComponent pc = new PropertiesComponent("camel.test.properties");
+
+        context.addComponent("properties", pc);
+
+        // Replace the context path in the test URL with the property placeholder.
+        url = url.replaceAll("/ignite", "{{test.contextPath}}");
+
+        // Recreate the Camel streamer with the new URL.
+        streamer = createCamelStreamer(dataStreamer);
+
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setCamelContext(context);
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Before sending the messages, get the actual URL after the property placeholder was resolved,
+        // stripping the jetty: prefix from it.
+        url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:", "");
+
+        // Send messages.
+        sendMessages(0, 50, false);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testInvalidEndpointUri() throws Exception {
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setEndpointUri("abc");
+
+        // Action time.
+        try {
+            streamer.start();
+            fail("Streamer started; should have failed.");
+        }
+        catch (IgniteException e) {
+            assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped);
+            assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0);
+        }
+    }
+
+    /**
+     * Creates a Camel streamer.
+     */
+    private CamelStreamer<Integer, String> createCamelStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
+        CamelStreamer<Integer, String> streamer = new CamelStreamer<>();
+
+        streamer.setIgnite(grid());
+        streamer.setStreamer(dataStreamer);
+        streamer.setEndpointUri("jetty:" + url);
+
+        dataStreamer.allowOverwrite(true);
+        dataStreamer.autoFlushFrequency(1);
+
+        return streamer;
+    }
+
+    /**
+     * @throws IOException
+     * @return HTTP response payloads.
+     */
+    private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException {
+        List<String> responses = Lists.newArrayList();
+
+        if (singleMessage) {
+            StringBuilder sb = new StringBuilder();
+
+            for (int i = fromIdx; i < fromIdx + cnt; i++)
+                sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n");
+
+            Request request = new Request.Builder()
+                .url(url)
+                .post(RequestBody.create(TEXT_PLAIN, sb.toString()))
+                .build();
+
+            Response response = httpClient.newCall(request).execute();
+
+            responses.add(response.body().string());
+        }
+        else {
+            for (int i = fromIdx; i < fromIdx + cnt; i++) {
+                String payload = i + "," + TEST_DATA.get(i);
+
+                Request request = new Request.Builder()
+                    .url(url)
+                    .post(RequestBody.create(TEXT_PLAIN, payload))
+                    .build();
+
+                Response response = httpClient.newCall(request).execute();
+
+                responses.add(response.body().string());
+            }
+        }
+
+        return responses;
+    }
+
+    /**
+     * Returns a {@link StreamSingleTupleExtractor} for testing.
+     */
+    private static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor() {
+        return new StreamSingleTupleExtractor<Exchange, Integer, String>() {
+            @Override public Map.Entry<Integer, String> extract(Exchange exchange) {
+                List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class));
+
+                return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+            }
+        };
+    }
+
+    /**
+     * Returns a {@link StreamMultipleTupleExtractor} for testing.
+     */
+    private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor() {
+        return new StreamMultipleTupleExtractor<Exchange, Integer, String>() {
+            @Override public Map<Integer, String> extract(Exchange exchange) {
+                final Map<String, String> map = Splitter.on("\n")
+                    .omitEmptyStrings()
+                    .withKeyValueSeparator(",")
+                    .split(exchange.getIn().getBody(String.class));
+
+                final Map<Integer, String> answer = new HashMap<>();
+
+                F.forEach(map.keySet(), new IgniteInClosure<String>() {
+                    @Override public void apply(String s) {
+                        answer.put(Integer.parseInt(s), map.get(s));
+                    }
+                });
+
+                return answer;
+            }
+        };
+    }
+
+    /**
+     * Subscribe to cache put events.
+     */
+    private CountDownLatch subscribeToPutEvents(int expect) {
+        Ignite ignite = grid();
+
+        // Listen to cache PUT events and expect as many as messages as test data items
+        final CountDownLatch latch = new CountDownLatch(expect);
+        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback =
+            new IgniteBiPredicate<UUID, CacheEvent>() {
+            @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null))
+            .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+
+        return latch;
+    }
+
+    /**
+     * Assert a given number of cache entries have been loaded.
+     */
+    private void assertCacheEntriesLoaded(int cnt) {
+        // get the cache and check that the entries are present
+        IgniteCache<Integer, String> cache = grid().cache(null);
+
+        // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt))
+            assertEquals(TEST_DATA.get(key), cache.get(key));
+
+        // assert that the cache exactly the specified amount of elements
+        assertEquals(cnt, cache.size(CachePeekMode.ALL));
+
+        // remove the event listener
+        grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
new file mode 100644
index 0000000..266c9cf
--- /dev/null
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.util.Set;
+
+import junit.framework.TestSuite;
+
+/**
+ * Camel streamer tests.
+ */
+public class IgniteCamelStreamerTestSuite extends TestSuite {
+    /**
+     * @return {@link IgniteCamelStreamerTest} test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        return suite(null);
+    }
+
+    /**
+     * @param ignoredTests
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
+        TestSuite suite = new TestSuite("IgniteCamelStreamer Test Suite");
+
+        suite.addTestSuite(IgniteCamelStreamerTest.class);
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/resources/camel.test.properties
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/resources/camel.test.properties b/modules/camel/src/test/resources/camel.test.properties
new file mode 100644
index 0000000..30459be
--- /dev/null
+++ b/modules/camel/src/test/resources/camel.test.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+test.contextPath = /ignite-properties

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 2cb7db7..afc1530 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteDataStreamer;
  * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
-
     /** Tuple extractor extracting a single tuple from an event */
     private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
 
@@ -99,9 +98,9 @@ public abstract class StreamAdapter<T, K, V> {
      */
     @Deprecated
     public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        if (singleTupleExtractor instanceof StreamTupleExtractor) {
+        if (singleTupleExtractor instanceof StreamTupleExtractor)
             return (StreamTupleExtractor) singleTupleExtractor;
-        }
+
         throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
             "StreamTupleExtractor; use getSingleTupleExtractor instead");
     }
@@ -112,9 +111,9 @@ public abstract class StreamAdapter<T, K, V> {
      */
     @Deprecated
     public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        if (multipleTupleExtractor != null) {
+        if (multipleTupleExtractor != null)
             throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
-        }
+
         this.singleTupleExtractor = extractor;
     }
 
@@ -129,9 +128,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param singleTupleExtractor Extractor for key-value tuples from messages.
      */
     public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
-        if (multipleTupleExtractor != null) {
+        if (multipleTupleExtractor != null)
             throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
-        }
+
         this.singleTupleExtractor = singleTupleExtractor;
     }
 
@@ -146,9 +145,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
      */
     public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
-        if (singleTupleExtractor != null) {
+        if (singleTupleExtractor != null)
             throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
-        }
+
         this.multipleTupleExtractor = multipleTupleExtractor;
     }
 
@@ -188,4 +187,4 @@ public abstract class StreamAdapter<T, K, V> {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c40b551..b9c51b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
         <module>modules/jms11</module>
         <module>modules/mqtt</module>
         <module>modules/zookeeper</module>
+        <module>modules/camel</module>
         <module>modules/platform</module>
     </modules>
 


[4/5] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-1.5' into ignite-single-op-get

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1.5' into ignite-single-op-get


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c685b8c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c685b8c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c685b8c

Branch: refs/heads/ignite-single-op-get
Commit: 6c685b8c9290fab77f8359444285fe04709e428b
Parents: 4e1caa6 c490de3
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 18 23:20:53 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 18 23:20:53 2015 +0300

----------------------------------------------------------------------
 modules/camel/pom.xml                           | 102 +++++
 .../ignite/stream/camel/CamelStreamer.java      | 237 +++++++++++
 .../stream/camel/IgniteCamelStreamerTest.java   | 420 +++++++++++++++++++
 .../camel/IgniteCamelStreamerTestSuite.java     |  48 +++
 .../src/test/resources/camel.test.properties    |  18 +
 .../ignite/codegen/MessageCodeGenerator.java    |  11 +-
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../discovery/GridDiscoveryManager.java         |   2 +-
 .../cache/GridCacheDeploymentManager.java       |   2 +-
 .../processors/cache/GridCacheGateway.java      |   1 -
 .../processors/cache/GridCacheMvcc.java         |   7 -
 .../processors/cache/GridCacheMvccManager.java  |  42 --
 .../GridCachePartitionExchangeManager.java      |  55 ++-
 .../cache/GridCacheSharedContext.java           |   7 +-
 .../distributed/GridCacheTxRecoveryFuture.java  |  41 +-
 .../distributed/GridDistributedBaseMessage.java |  56 ---
 .../distributed/GridDistributedLockRequest.java |   6 -
 .../GridDistributedLockResponse.java            |  32 +-
 .../GridDistributedTxPrepareRequest.java        |  67 ++-
 .../distributed/dht/GridDhtLockFuture.java      |  63 ++-
 .../distributed/dht/GridDhtLockRequest.java     |   2 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   4 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   9 +
 .../distributed/dht/GridDhtTxPrepareFuture.java |  60 ++-
 .../dht/GridDhtTxPrepareRequest.java            |  54 +--
 .../dht/colocated/GridDhtColocatedCache.java    |   2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  55 ++-
 .../distributed/near/GridNearLockFuture.java    |  56 ++-
 .../distributed/near/GridNearLockRequest.java   |   4 +-
 ...arOptimisticSerializableTxPrepareFuture.java |  91 ++--
 .../near/GridNearOptimisticTxPrepareFuture.java |  50 ++-
 .../GridNearPessimisticTxPrepareFuture.java     |  39 +-
 .../near/GridNearTransactionalCache.java        |   7 +-
 .../near/GridNearTxFinishFuture.java            |  18 +-
 .../cache/distributed/near/GridNearTxLocal.java |  15 -
 .../near/GridNearTxPrepareFutureAdapter.java    |  14 +-
 .../near/GridNearTxPrepareRequest.java          |  52 +--
 .../cache/transactions/IgniteInternalTx.java    |   1 +
 .../cache/transactions/IgniteTxAdapter.java     |   1 +
 .../cache/transactions/IgniteTxEntry.java       |   9 +-
 .../IgniteTxImplicitSingleStateImpl.java        |   7 +
 .../transactions/IgniteTxLocalAdapter.java      |  12 +-
 .../cache/transactions/IgniteTxManager.java     |   1 +
 .../IgniteTxRemoteStateAdapter.java             |   5 +
 .../cache/transactions/IgniteTxState.java       |   6 +
 .../cache/transactions/IgniteTxStateImpl.java   |  69 ++-
 .../clock/GridClockSyncProcessor.java           |  28 +-
 .../internal/util/UUIDCollectionMessage.java    | 114 +++++
 .../util/future/GridCompoundFuture.java         |  15 +-
 .../ignite/internal/util/lang/GridFunc.java     |   8 +-
 .../ignite/internal/util/nio/GridNioServer.java |  13 +-
 .../org/apache/ignite/stream/StreamAdapter.java |  19 +-
 .../IgniteCacheTxStoreSessionTest.java          |   2 +-
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 pom.xml                                         |   1 +
 55 files changed, 1641 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e7b61a8,2503eda..3548aac
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@@ -693,14 -692,9 +694,19 @@@ public class GridIoMessageFactory imple
                  break;
  
              case 115:
-                 msg = new GridNearSingleGetRequest();
+                 msg = new UUIDCollectionMessage();
  
                  break;
 +
 +            case 116:
++                msg = new GridNearSingleGetRequest();
++
++                break;
++
++            case 117:
 +                msg = new GridNearSingleGetResponse();
 +
 +                break;
  
              // [-3..114] - this
              // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index f28f9b7,8562f37..9104acb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@@ -28,9 -29,9 +28,8 @@@ import java.util.Set
  import java.util.UUID;
  import java.util.concurrent.ConcurrentLinkedQueue;
  import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.ConcurrentSkipListSet;
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.IgniteLogger;
 -import org.apache.ignite.cluster.ClusterNode;
  import org.apache.ignite.events.DiscoveryEvent;
  import org.apache.ignite.events.Event;
  import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@@ -953,9 -912,8 +912,8 @@@ public class GridCacheMvccManager exten
          X.println(">>> ");
          X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
          X.println(">>>   rmvLocksSize: " + rmvLocks.sizex());
-         X.println(">>>   dhtLocCandsSize: " + dhtLocCands.size());
          X.println(">>>   lockedSize: " + locked.size());
 -        X.println(">>>   futsSize: " + futs.size());
 +        X.println(">>>   futsSize: " + (mvccFuts.size() + futs.size()));
          X.println(">>>   near2dhtSize: " + near2dht.size());
          X.println(">>>   finishFutsSize: " + finishFuts.sizex());
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c685b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------