You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/11 10:37:34 UTC

[1/4] ignite git commit: ignite-4571 - reviewed contribution

Repository: ignite
Updated Branches:
  refs/heads/ignite-4929 0d0297e9f -> dffea7168


ignite-4571 - reviewed contribution


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b99c1980
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b99c1980
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b99c1980

Branch: refs/heads/ignite-4929
Commit: b99c1980dfe250b3f24a94eb4b6cb948bb314ab5
Parents: 5c48260
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon Apr 10 18:29:33 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Apr 10 18:29:33 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |  2 +-
 .../processors/cache/GridCacheMvccManager.java  | 84 ++++++++++++++++----
 .../cache/GridDeferredAckMessageSender.java     | 11 ++-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  4 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |  2 +-
 .../GridNearAtomicSingleUpdateFuture.java       | 21 ++---
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 23 +++---
 .../cache/transactions/IgniteTxManager.java     |  2 +-
 8 files changed, 102 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8df229e..87ae29c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
      * @return Future ID.
      */
-    public Long id();
+    public long id();
 
     /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dff2c88..712d136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -81,6 +81,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /** Maxim number of removed locks. */
     private static final int MAX_REMOVED_LOCKS = 10240;
 
+    /** Maxim number of atomic IDs for thread. Must be power of two! */
+    protected static final int THREAD_RESERVE_SIZE = 0x4000;
+
     /** */
     private static final int MAX_NESTED_LSNR_CALLS = getInteger(IGNITE_MAX_NESTED_LISTENER_CALLS, 5);
 
@@ -106,9 +109,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     @GridToStringExclude
     private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
 
-    /** */
-    private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis());
-
     /** Pending atomic futures. */
     private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
 
@@ -138,6 +138,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /** */
     private volatile boolean stopping;
 
+    /** Global atomic id counter. */
+    protected final AtomicLong globalAtomicCnt = new AtomicLong();
+
+    /** Per thread atomic id counter. */
+    private final ThreadLocal<LongWrapper> threadAtomicCnt = new ThreadLocal<LongWrapper>() {
+        @Override protected LongWrapper initialValue() {
+            return new LongWrapper(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
+        }
+    };
+
     /** Lock callback. */
     @GridToStringExclude
     private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -256,9 +266,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
                 if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                    Long futId = cacheFut.id();
+                    long futId = cacheFut.id();
 
-                    if (futId != null)
+                    if (futId > 0)
                         atomicFuts.remove(futId, cacheFut);
                 }
             }
@@ -426,18 +436,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @return ID for atomic cache update future.
-     */
-    public long atomicFutureId() {
-        return atomicFutId.incrementAndGet();
-    }
-
-    /**
      * @param futId Future ID.
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+    public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
         IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
 
         assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
@@ -472,7 +475,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
         return atomicFuts.get(futId);
     }
 
@@ -480,7 +483,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param futId Future ID.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
         return atomicFuts.remove(futId);
     }
 
@@ -1174,6 +1177,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Next future ID for atomic futures.
+     */
+    public long nextAtomicId() {
+        LongWrapper cnt = threadAtomicCnt.get();
+
+        long res = cnt.getAndIncrement();
+
+        if ((cnt.get() & (THREAD_RESERVE_SIZE - 1)) == 0)
+            cnt.set(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
+
+        return res;
+    }
+
+    /**
      *
      */
     private class FinishLockFuture extends GridFutureAdapter<Object> {
@@ -1382,4 +1399,41 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             return S.toString(DataStreamerFuture.class, this, super.toString());
         }
     }
+
+    /** Long wrapper. */
+    private static class LongWrapper {
+        /** */
+        private long val;
+
+        /**
+         * @param val Value.
+         */
+        public LongWrapper(long val) {
+            this.val = val + 1;
+
+            if (this.val == 0)
+                this.val = 1;
+        }
+
+        /**
+         * @param val Value to set.
+         */
+        public void set(long val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Current value.
+         */
+        public long get() {
+            return val;
+        }
+
+        /**
+         * @return Current value (and stores incremented value).
+         */
+        public long getAndIncrement() {
+            return val++;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 5265ec9..89aa725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -21,7 +21,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -33,7 +32,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
 /**
  *
  */
-public abstract class GridDeferredAckMessageSender {
+public abstract class GridDeferredAckMessageSender<T> {
     /** Deferred message buffers. */
     private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
 
@@ -67,7 +66,7 @@ public abstract class GridDeferredAckMessageSender {
      * @param nodeId Node ID.
      * @param vers Versions to send.
      */
-    public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+    public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<T> vers);
 
     /**
      *
@@ -81,7 +80,7 @@ public abstract class GridDeferredAckMessageSender {
      * @param nodeId Node ID to send message to.
      * @param ver Version to ack.
      */
-    public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+    public void sendDeferredAckMessage(UUID nodeId, T ver) {
         while (true) {
             DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
 
@@ -117,7 +116,7 @@ public abstract class GridDeferredAckMessageSender {
         private AtomicBoolean guard = new AtomicBoolean(false);
 
         /** Versions. */
-        private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+        private ConcurrentLinkedDeque8<T> vers = new ConcurrentLinkedDeque8<>();
 
         /** Node ID. */
         private final UUID nodeId;
@@ -173,7 +172,7 @@ public abstract class GridDeferredAckMessageSender {
          * @param ver Version to send.
          * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
          */
-        public boolean add(GridCacheVersion ver) {
+        public boolean add(T ver) {
             readLock().lock();
 
             boolean snd = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..0940acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -110,7 +110,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         this.updateReq = updateReq;
         this.writeVer = writeVer;
 
-        futId = cctx.mvcc().atomicFutureId();
+        futId = cctx.mvcc().nextAtomicId();
 
         if (log == null) {
             msgLog = cctx.shared().atomicMessageLogger();
@@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /** {@inheritDoc} */
-    @Override public final Long id() {
+    @Override public final long id() {
         return futId;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..a2adb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -138,7 +138,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
     /** Future ID. */
     @GridToStringInclude
-    protected Long futId;
+    protected long futId;
 
     /** Operation result. */
     protected GridCacheReturn opRes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index c2372d1..e4ba457 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             assert reqState != null;
@@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         CachePartialUpdateCheckedException err0 = null;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             req = reqState.processPrimaryResponse(nodeId, res);
@@ -331,7 +331,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Non-null topology version if update should be remapped.
      */
     private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert Thread.holdsLock(mux);
+        assert futId > 0;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -362,7 +363,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             cctx.mvcc().removeAtomicFuture(futId);
 
             reqState = null;
-            futId = null;
+            futId = 0;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -479,7 +480,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override protected void map(AffinityTopologyVersion topVer) {
-        long futId = cctx.mvcc().atomicFutureId();
+        long futId = cctx.mvcc().nextAtomicId();
 
         Exception err = null;
         PrimaryRequestState reqState0 = null;
@@ -488,7 +489,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             reqState0 = mapSingleUpdate(topVer, futId);
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == 0 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -529,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     /**
      * @param futId Future ID.
      */
-    private void checkDhtNodes(Long futId) {
+    private void checkDhtNodes(long futId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
@@ -537,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridNearAtomicCheckUpdateRequest checkReq = null;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == 0 || this.futId != futId)
                 return;
 
             assert reqState != null;
@@ -570,7 +571,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = 0;
         }
 
         return id0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..84deefc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -150,7 +150,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override public Long id() {
+    @Override public long id() {
         synchronized (mux) {
             return futId;
         }
@@ -167,7 +167,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
         synchronized (mux) {
-            if (futId == null)
+            if (futId == 0)
                 return false;
 
             if (singleReq != null) {
@@ -300,7 +300,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion remapTopVer0;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             PrimaryRequestState reqState;
@@ -373,7 +373,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         boolean rcvAll;
 
         synchronized (mux) {
-            if (futId == null || futId != res.futureId())
+            if (futId == 0 || futId != res.futureId())
                 return;
 
             if (singleReq != null) {
@@ -534,7 +534,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Non null topology version if update should be remapped.
      */
     @Nullable private AffinityTopologyVersion onAllReceived() {
-        assert futId != null;
+        assert Thread.holdsLock(mux);
+        assert futId > 0;
 
         AffinityTopologyVersion remapTopVer0 = null;
 
@@ -577,7 +578,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (remapTopVer0 != null) {
             cctx.mvcc().removeAtomicFuture(futId);
 
-            futId = null;
+            futId = 0;
             topVer = AffinityTopologyVersion.ZERO;
 
             remapTopVer = null;
@@ -767,7 +768,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        Long futId = cctx.mvcc().atomicFutureId();
+        long futId = cctx.mvcc().nextAtomicId();
 
         Exception err = null;
         PrimaryRequestState singleReq0 = null;
@@ -801,7 +802,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futId == null : this;
+                assert this.futId == 0 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
                 this.topVer = topVer;
@@ -856,7 +857,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             checkDhtNodes(futId);
     }
 
-    private void checkDhtNodes(Long futId) {
+    private void checkDhtNodes(long futId) {
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
         AffinityTopologyVersion remapTopVer0 = null;
@@ -866,7 +867,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         boolean rcvAll = false;
 
         synchronized (mux) {
-            if (this.futId == null || !this.futId.equals(futId))
+            if (this.futId == 0 || this.futId != futId)
                 return;
 
             if (singleReq != null) {
@@ -940,7 +941,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         synchronized (mux) {
             id0 = futId;
 
-            futId = null;
+            futId = 0;
         }
 
         return id0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d1334ef..6b383a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -211,7 +211,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         txHnd = new IgniteTxHandler(cctx);
 
-        deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+        deferredAckMsgSnd = new GridDeferredAckMessageSender<GridCacheVersion>(cctx.time(), cctx.kernalContext().closure()) {
             @Override public int getTimeout() {
                 return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
             }


[4/4] ignite git commit: ignite-4929

Posted by sb...@apache.org.
ignite-4929


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dffea716
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dffea716
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dffea716

Branch: refs/heads/ignite-4929
Commit: dffea71687ee124e9cbd219df82bc748e9c708d6
Parents: 4e80e0c
Author: sboikov <sb...@gridgain.com>
Authored: Tue Apr 11 13:37:26 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Apr 11 13:37:26 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java      | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dffea716/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 68e51bb..3029b09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1527,6 +1527,21 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     ) {
         GridDistributedTxMapping global = globalMap.get(n.id());
 
+        if (!F.isEmpty(entry.entryProcessors())) {
+            GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+                entry.cached().partition());
+
+            if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+                T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue();
+
+                assert procVal != null : entry;
+
+                entry.op(procVal.get1());
+                entry.value(procVal.get2(), true, false);
+                entry.entryProcessors(null);
+            }
+        }
+
         if (global == null)
             globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
 


[3/4] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-4929

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-4929


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e80e0ca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e80e0ca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e80e0ca

Branch: refs/heads/ignite-4929
Commit: 4e80e0caa4fc3fa8cd0aa0157dbb87b8e1289e59
Parents: 0d0297e edfa353
Author: sboikov <sb...@gridgain.com>
Authored: Tue Apr 11 11:33:53 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Apr 11 11:33:53 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |  2 +-
 .../processors/cache/GridCacheMvccManager.java  | 84 ++++++++++++++++----
 .../cache/GridDeferredAckMessageSender.java     | 11 ++-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  4 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |  2 +-
 .../GridNearAtomicSingleUpdateFuture.java       | 21 ++---
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 23 +++---
 .../cache/transactions/IgniteTxManager.java     |  2 +-
 modules/log4j2/pom.xml                          |  4 +-
 .../ignite/logger/log4j2/Log4J2Logger.java      | 23 ++++--
 10 files changed, 122 insertions(+), 54 deletions(-)
----------------------------------------------------------------------



[2/4] ignite git commit: IGNITE-4847: Upgrade log4j2 to version 2.8.1

Posted by sb...@apache.org.
IGNITE-4847: Upgrade log4j2 to version 2.8.1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/edfa353e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/edfa353e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/edfa353e

Branch: refs/heads/ignite-4929
Commit: edfa353eba7c87c802df9e32678cb253549949b3
Parents: b99c198
Author: Michael Griggs <en...@gmail.com>
Authored: Mon Apr 10 12:39:49 2017 -0700
Committer: Denis Magda <dm...@gridgain.com>
Committed: Mon Apr 10 12:39:49 2017 -0700

----------------------------------------------------------------------
 modules/log4j2/pom.xml                          |  4 ++--
 .../ignite/logger/log4j2/Log4J2Logger.java      | 23 +++++++++++++++-----
 2 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/edfa353e/modules/log4j2/pom.xml
----------------------------------------------------------------------
diff --git a/modules/log4j2/pom.xml b/modules/log4j2/pom.xml
index 5b0e989..5c6e7c4 100644
--- a/modules/log4j2/pom.xml
+++ b/modules/log4j2/pom.xml
@@ -52,13 +52,13 @@
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-api</artifactId>
-            <version>2.3</version>
+            <version>2.8.1</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
-            <version>2.3</version>
+            <version>2.8.1</version>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/edfa353e/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java
----------------------------------------------------------------------
diff --git a/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java b/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java
index ffe8e1b..dff3179 100644
--- a/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java
+++ b/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java
@@ -353,18 +353,31 @@ public class Log4J2Logger implements IgniteLogger, LoggerNodeIdAware {
 
         Configuration cfg = ctx.getConfiguration();
 
-        PatternLayout layout = PatternLayout.createLayout("[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n", null, null,
-            Charset.defaultCharset(), false, false, null, null);
+        PatternLayout.Builder builder = PatternLayout.newBuilder();
+
+        builder
+            .withPattern("%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n")
+            .withCharset(Charset.defaultCharset())
+            .withAlwaysWriteExceptions(false)
+            .withNoConsoleNoAnsi(false);
+
+        PatternLayout layout = builder.build();
+
+        ConsoleAppender.Builder consoleAppenderBuilder = ConsoleAppender.newBuilder();
+
+        consoleAppenderBuilder
+            .withName(CONSOLE_APPENDER)
+            .withLayout(layout);
+
+        ConsoleAppender consoleApp = consoleAppenderBuilder.build();
 
-        final Appender consoleApp = ConsoleAppender.createAppender(layout, null, null, CONSOLE_APPENDER, null, null);
         consoleApp.start();
 
         AppenderRef ref = AppenderRef.createAppenderRef(CONSOLE_APPENDER, Level.TRACE, null);
 
         AppenderRef[] refs = {ref};
 
-        LoggerConfig logCfg = LoggerConfig.createLogger("false", Level.INFO, LogManager.ROOT_LOGGER_NAME, "",
-            refs, null, null, null);
+        LoggerConfig logCfg = LoggerConfig.createLogger(false, Level.INFO, LogManager.ROOT_LOGGER_NAME, "", refs, null, null, null);
 
         logCfg.addAppender(consoleApp, null, null);
         cfg.addAppender(consoleApp);