You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/04/11 10:06:09 UTC

[20/22] ignite git commit: ignite-4571 - reviewed contribution

ignite-4571 - reviewed contribution


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b99c1980
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b99c1980
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b99c1980

Branch: refs/heads/ignite-3477-master
Commit: b99c1980dfe250b3f24a94eb4b6cb948bb314ab5
Parents: 5c48260
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon Apr 10 18:29:33 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Apr 10 18:29:33 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |  2 +-
 .../processors/cache/GridCacheMvccManager.java  | 84 ++++++++++++++++----
 .../cache/GridDeferredAckMessageSender.java     | 11 ++-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  4 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |  2 +-
 .../GridNearAtomicSingleUpdateFuture.java       | 21 ++---
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 23 +++---
 .../cache/transactions/IgniteTxManager.java     |  2 +-
 8 files changed, 102 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8df229e..87ae29c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
      * @return Future ID.
      */
-    public Long id();
+    public long id();
 
     /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dff2c88..712d136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -81,6 +81,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /** Maxim number of removed locks. */
     private static final int MAX_REMOVED_LOCKS = 10240;
 
+    /** Maxim number of atomic IDs for thread. Must be power of two! */
+    protected static final int THREAD_RESERVE_SIZE = 0x4000;
+
     /** */
     private static final int MAX_NESTED_LSNR_CALLS = getInteger(IGNITE_MAX_NESTED_LISTENER_CALLS, 5);
 
@@ -106,9 +109,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     @GridToStringExclude
     private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
 
-    /** */
-    private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis());
-
     /** Pending atomic futures. */
     private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
 
@@ -138,6 +138,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /** */
     private volatile boolean stopping;
 
+    /** Global atomic id counter. */
+    protected final AtomicLong globalAtomicCnt = new AtomicLong();
+
+    /** Per thread atomic id counter. */
+    private final ThreadLocal<LongWrapper> threadAtomicCnt = new ThreadLocal<LongWrapper>() {
+        @Override protected LongWrapper initialValue() {
+            return new LongWrapper(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
+        }
+    };
+
     /** Lock callback. */
     @GridToStringExclude
     private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -256,9 +266,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
                 if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                    Long futId = cacheFut.id();
+                    long futId = cacheFut.id();
 
-                    if (futId != null)
+                    if (futId > 0)
                         atomicFuts.remove(futId, cacheFut);
                 }
             }
@@ -426,18 +436,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @return ID for atomic cache update future.
-     */
-    public long atomicFutureId() {
-        return atomicFutId.incrementAndGet();
-    }
-
-    /**
      * @param futId Future ID.
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+    public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
         IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
 
         assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
@@ -472,7 +475,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
         return atomicFuts.get(futId);
     }
 
@@ -480,7 +483,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
         return atomicFuts.remove(futId);
     }
 
@@ -1174,6 +1177,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Next future ID for atomic futures.
+     */
+    public long nextAtomicId() {
+        LongWrapper cnt = threadAtomicCnt.get();
+
+        long res = cnt.getAndIncrement();
+
+        if ((cnt.get() & (THREAD_RESERVE_SIZE - 1)) == 0)
+            cnt.set(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
+
+        return res;
+    }
+
+    /**
      *
      */
     private class FinishLockFuture extends GridFutureAdapter<Object> {
@@ -1382,4 +1399,41 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             return S.toString(DataStreamerFuture.class, this, super.toString());
         }
     }
+
+    /** Long wrapper. */
+    private static class LongWrapper {
+        /** */
+        private long val;
+
+        /**
+         * @param val Value.
+         */
+        public LongWrapper(long val) {
+            this.val = val + 1;
+
+            if (this.val == 0)
+                this.val = 1;
+        }
+
+        /**
+         * @param val Value to set.
+         */
+        public void set(long val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Current value.
+         */
+        public long get() {
+            return val;
+        }
+
+        /**
+         * @return Current value (and stores incremented value).
+         */
+        public long getAndIncrement() {
+            return val++;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 5265ec9..89aa725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -21,7 +21,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -33,7 +32,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
 /**
  *
  */
-public abstract class GridDeferredAckMessageSender {
+public abstract class GridDeferredAckMessageSender<T> {
     /** Deferred message buffers. */
     private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
 
@@ -67,7 +66,7 @@ public abstract class GridDeferredAckMessageSender {
      * @param nodeId Node ID.
      * @param vers Versions to send.
      */
-    public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+    public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<T> vers);
 
     /**
      *
@@ -81,7 +80,7 @@ public abstract class GridDeferredAckMessageSender {
      * @param nodeId Node ID to send message to.
      * @param ver Version to ack.
      */
-    public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+    public void sendDeferredAckMessage(UUID nodeId, T ver) {
         while (true) {
             DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
 
@@ -117,7 +116,7 @@ public abstract class GridDeferredAckMessageSender {
         private AtomicBoolean guard = new AtomicBoolean(false);
 
         /** Versions. */
-        private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+        private ConcurrentLinkedDeque8<T> vers = new ConcurrentLinkedDeque8<>();
 
         /** Node ID. */
         private final UUID nodeId;
@@ -173,7 +172,7 @@ public abstract class GridDeferredAckMessageSender {
          * @param ver Version to send.
          * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
          */
-        public boolean add(GridCacheVersion ver) {
+        public boolean add(T ver) {
             readLock().lock();
 
             boolean snd = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..0940acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -110,7 +110,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         this.updateReq = updateReq;
         this.writeVer = writeVer;
 
-        futId = cctx.mvcc().atomicFutureId();
+        futId = cctx.mvcc().nextAtomicId();
 
         if (log == null) {
             msgLog = cctx.shared().atomicMessageLogger();
@@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /** {@inheritDoc} */
-    @Override public final Long id() {
+    @Override public final long id() {
         return futId;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..a2adb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -138,7 +138,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
     /** Future ID. */
     @GridToStringInclude
-    protected Long futId;
+    protected long futId;
 
     /** Operation result. */
     protected GridCacheReturn opRes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index c2372d1..e4ba457 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             assert reqState != null;
@@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         CachePartialUpdateCheckedException err0 = null;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             req = reqState.processPrimaryResponse(nodeId, res);
@@ -331,7 +331,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Non-null topology version if update should be remapped.
      */
     private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert Thread.holdsLock(mux);
+        assert futId > 0;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -362,7 +363,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             cctx.mvcc().removeAtomicFuture(futId);
 
             reqState = null;
-            futId = null;
+            futId = 0;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -479,7 +480,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override protected void map(AffinityTopologyVersion topVer) {
-        long futId = cctx.mvcc().atomicFutureId();
+        long futId = cctx.mvcc().nextAtomicId();
 
         Exception err = null;
         PrimaryRequestState reqState0 = null;
@@ -488,7 +489,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             reqState0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == 0 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -529,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     /**
      * @param futId Future ID.
      */
-    private void checkDhtNodes(Long futId) {
+    private void checkDhtNodes(long futId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
@@ -537,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridNearAtomicCheckUpdateRequest checkReq = null;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == 0 || this.futId != futId)
                 return;
 
             assert reqState != null;
@@ -570,7 +571,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = 0;
         }
 
         return id0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..84deefc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -150,7 +150,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -167,7 +167,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
         synchronized (mux) {
-            if (futId == null)
+            if (futId == 0)
                 return false;
 
             if (singleReq != null) {
@@ -300,7 +300,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             PrimaryRequestState reqState;
@@ -373,7 +373,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         boolean rcvAll;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             if (singleReq != null) {
@@ -534,7 +534,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Non null topology version if update should be remapped.
      */
     @Nullable private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert Thread.holdsLock(mux);
+        assert futId > 0;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -577,7 +578,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (remapTopVer0 != null) {
             cctx.mvcc().removeAtomicFuture(futId);
 
-            futId = null;
+            futId = 0;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -767,7 +768,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        Long futId = cctx.mvcc().atomicFutureId();
+        long futId = cctx.mvcc().nextAtomicId();
 
         Exception err = null;
         PrimaryRequestState singleReq0 = null;
@@ -801,7 +802,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == 0 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -856,7 +857,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             checkDhtNodes(futId);
     }
 
-    private void checkDhtNodes(Long futId) {
+    private void checkDhtNodes(long futId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
@@ -866,7 +867,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         boolean rcvAll = false;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == 0 || this.futId != futId)
                 return;
 
             if (singleReq != null) {
@@ -940,7 +941,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = 0;
         }
 
         return id0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d1334ef..6b383a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -211,7 +211,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         txHnd = new IgniteTxHandler(cctx);
 
-        deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+        deferredAckMsgSnd = new GridDeferredAckMessageSender<GridCacheVersion>(cctx.time(), cctx.kernalContext().closure()) {
             @Override public int getTimeout() {
                 return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
             }