You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/04/11 10:06:09 UTC
[20/22] ignite git commit: ignite-4571 - reviewed contribution
ignite-4571 - reviewed contribution
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b99c1980
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b99c1980
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b99c1980
Branch: refs/heads/ignite-3477-master
Commit: b99c1980dfe250b3f24a94eb4b6cb948bb314ab5
Parents: 5c48260
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon Apr 10 18:29:33 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Apr 10 18:29:33 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAtomicFuture.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 84 ++++++++++++++++----
.../cache/GridDeferredAckMessageSender.java | 11 ++-
.../GridDhtAtomicAbstractUpdateFuture.java | 4 +-
.../GridNearAtomicAbstractUpdateFuture.java | 2 +-
.../GridNearAtomicSingleUpdateFuture.java | 21 ++---
.../dht/atomic/GridNearAtomicUpdateFuture.java | 23 +++---
.../cache/transactions/IgniteTxManager.java | 2 +-
8 files changed, 102 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 8df229e..87ae29c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
* @return Future ID.
*/
- public Long id();
+ public long id();
/**
* Gets future that will be completed when it is safe when update is finished on the given version of topology.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dff2c88..712d136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -81,6 +81,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/** Maxim number of removed locks. */
private static final int MAX_REMOVED_LOCKS = 10240;
+ /** Maxim number of atomic IDs for thread. Must be power of two! */
+ protected static final int THREAD_RESERVE_SIZE = 0x4000;
+
/** */
private static final int MAX_NESTED_LSNR_CALLS = getInteger(IGNITE_MAX_NESTED_LISTENER_CALLS, 5);
@@ -106,9 +109,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@GridToStringExclude
private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
- /** */
- private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis());
-
/** Pending atomic futures. */
private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
@@ -138,6 +138,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/** */
private volatile boolean stopping;
+ /** Global atomic id counter. */
+ protected final AtomicLong globalAtomicCnt = new AtomicLong();
+
+ /** Per thread atomic id counter. */
+ private final ThreadLocal<LongWrapper> threadAtomicCnt = new ThreadLocal<LongWrapper>() {
+ @Override protected LongWrapper initialValue() {
+ return new LongWrapper(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
+ }
+ };
+
/** Lock callback. */
@GridToStringExclude
private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -256,9 +266,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
cacheFut.onNodeLeft(discoEvt.eventNode().id());
if (cacheFut.isCancelled() || cacheFut.isDone()) {
- Long futId = cacheFut.id();
+ long futId = cacheFut.id();
- if (futId != null)
+ if (futId > 0)
atomicFuts.remove(futId, cacheFut);
}
}
@@ -426,18 +436,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
- * @return ID for atomic cache update future.
- */
- public long atomicFutureId() {
- return atomicFutId.incrementAndGet();
- }
-
- /**
* @param futId Future ID.
* @param fut Future.
* @return {@code False} if future was forcibly completed with error.
*/
- public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+ public boolean addAtomicFuture(long futId, GridCacheAtomicFuture<?> fut) {
IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
@@ -472,7 +475,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param futId Future ID.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+ @Nullable public IgniteInternalFuture<?> atomicFuture(long futId) {
return atomicFuts.get(futId);
}
@@ -480,7 +483,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param futId Future ID.
* @return Removed future.
*/
- @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+ @Nullable public IgniteInternalFuture<?> removeAtomicFuture(long futId) {
return atomicFuts.remove(futId);
}
@@ -1174,6 +1177,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Next future ID for atomic futures.
+ */
+ public long nextAtomicId() {
+ LongWrapper cnt = threadAtomicCnt.get();
+
+ long res = cnt.getAndIncrement();
+
+ if ((cnt.get() & (THREAD_RESERVE_SIZE - 1)) == 0)
+ cnt.set(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE));
+
+ return res;
+ }
+
+ /**
*
*/
private class FinishLockFuture extends GridFutureAdapter<Object> {
@@ -1382,4 +1399,41 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return S.toString(DataStreamerFuture.class, this, super.toString());
}
}
+
+ /** Long wrapper. */
+ private static class LongWrapper {
+ /** */
+ private long val;
+
+ /**
+ * @param val Value.
+ */
+ public LongWrapper(long val) {
+ this.val = val + 1;
+
+ if (this.val == 0)
+ this.val = 1;
+ }
+
+ /**
+ * @param val Value to set.
+ */
+ public void set(long val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Current value.
+ */
+ public long get() {
+ return val;
+ }
+
+ /**
+ * @return Current value (and stores incremented value).
+ */
+ public long getAndIncrement() {
+ return val++;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 5265ec9..89aa725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -21,7 +21,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -33,7 +32,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
/**
*
*/
-public abstract class GridDeferredAckMessageSender {
+public abstract class GridDeferredAckMessageSender<T> {
/** Deferred message buffers. */
private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
@@ -67,7 +66,7 @@ public abstract class GridDeferredAckMessageSender {
* @param nodeId Node ID.
* @param vers Versions to send.
*/
- public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+ public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<T> vers);
/**
*
@@ -81,7 +80,7 @@ public abstract class GridDeferredAckMessageSender {
* @param nodeId Node ID to send message to.
* @param ver Version to ack.
*/
- public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+ public void sendDeferredAckMessage(UUID nodeId, T ver) {
while (true) {
DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
@@ -117,7 +116,7 @@ public abstract class GridDeferredAckMessageSender {
private AtomicBoolean guard = new AtomicBoolean(false);
/** Versions. */
- private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+ private ConcurrentLinkedDeque8<T> vers = new ConcurrentLinkedDeque8<>();
/** Node ID. */
private final UUID nodeId;
@@ -173,7 +172,7 @@ public abstract class GridDeferredAckMessageSender {
* @param ver Version to send.
* @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
*/
- public boolean add(GridCacheVersion ver) {
+ public boolean add(T ver) {
readLock().lock();
boolean snd = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..0940acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -110,7 +110,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
this.updateReq = updateReq;
this.writeVer = writeVer;
- futId = cctx.mvcc().atomicFutureId();
+ futId = cctx.mvcc().nextAtomicId();
if (log == null) {
msgLog = cctx.shared().atomicMessageLogger();
@@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/** {@inheritDoc} */
- @Override public final Long id() {
+ @Override public final long id() {
return futId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..a2adb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -138,7 +138,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Future ID. */
@GridToStringInclude
- protected Long futId;
+ protected long futId;
/** Operation result. */
protected GridCacheReturn opRes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index c2372d1..e4ba457 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- @Override public Long id() {
+ @Override public long id() {
synchronized (mux) {
return futId;
}
@@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
AffinityTopologyVersion remapTopVer0;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == 0 || futId != res.futureId())
return;
assert reqState != null;
@@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
CachePartialUpdateCheckedException err0 = null;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == 0 || futId != res.futureId())
return;
req = reqState.processPrimaryResponse(nodeId, res);
@@ -331,7 +331,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @return Non-null topology version if update should be remapped.
*/
private AffinityTopologyVersion onAllReceived() {
- assert futId != null;
+ assert Thread.holdsLock(mux);
+ assert futId > 0;
AffinityTopologyVersion remapTopVer0 = null;
@@ -362,7 +363,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cctx.mvcc().removeAtomicFuture(futId);
reqState = null;
- futId = null;
+ futId = 0;
topVer = AffinityTopologyVersion.ZERO;
remapTopVer = null;
@@ -479,7 +480,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override protected void map(AffinityTopologyVersion topVer) {
- long futId = cctx.mvcc().atomicFutureId();
+ long futId = cctx.mvcc().nextAtomicId();
Exception err = null;
PrimaryRequestState reqState0 = null;
@@ -488,7 +489,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
reqState0 = mapSingleUpdate(topVer, futId);
synchronized (mux) {
- assert this.futId == null : this;
+ assert this.futId == 0 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
this.topVer = topVer;
@@ -529,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/**
* @param futId Future ID.
*/
- private void checkDhtNodes(Long futId) {
+ private void checkDhtNodes(long futId) {
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
AffinityTopologyVersion remapTopVer0 = null;
@@ -537,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicCheckUpdateRequest checkReq = null;
synchronized (mux) {
- if (this.futId == null || !this.futId.equals(futId))
+ if (this.futId == 0 || this.futId != futId)
return;
assert reqState != null;
@@ -570,7 +571,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
synchronized (mux) {
id0 = futId;
- futId = null;
+ futId = 0;
}
return id0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..84deefc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -150,7 +150,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override public Long id() {
+ @Override public long id() {
synchronized (mux) {
return futId;
}
@@ -167,7 +167,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
synchronized (mux) {
- if (futId == null)
+ if (futId == 0)
return false;
if (singleReq != null) {
@@ -300,7 +300,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
AffinityTopologyVersion remapTopVer0;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == 0 || futId != res.futureId())
return;
PrimaryRequestState reqState;
@@ -373,7 +373,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll;
synchronized (mux) {
- if (futId == null || futId != res.futureId())
+ if (futId == 0 || futId != res.futureId())
return;
if (singleReq != null) {
@@ -534,7 +534,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @return Non null topology version if update should be remapped.
*/
@Nullable private AffinityTopologyVersion onAllReceived() {
- assert futId != null;
+ assert Thread.holdsLock(mux);
+ assert futId > 0;
AffinityTopologyVersion remapTopVer0 = null;
@@ -577,7 +578,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (remapTopVer0 != null) {
cctx.mvcc().removeAtomicFuture(futId);
- futId = null;
+ futId = 0;
topVer = AffinityTopologyVersion.ZERO;
remapTopVer = null;
@@ -767,7 +768,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- Long futId = cctx.mvcc().atomicFutureId();
+ long futId = cctx.mvcc().nextAtomicId();
Exception err = null;
PrimaryRequestState singleReq0 = null;
@@ -801,7 +802,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
synchronized (mux) {
- assert this.futId == null : this;
+ assert this.futId == 0 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
this.topVer = topVer;
@@ -856,7 +857,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
checkDhtNodes(futId);
}
- private void checkDhtNodes(Long futId) {
+ private void checkDhtNodes(long futId) {
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
AffinityTopologyVersion remapTopVer0 = null;
@@ -866,7 +867,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll = false;
synchronized (mux) {
- if (this.futId == null || !this.futId.equals(futId))
+ if (this.futId == 0 || this.futId != futId)
return;
if (singleReq != null) {
@@ -940,7 +941,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
synchronized (mux) {
id0 = futId;
- futId = null;
+ futId = 0;
}
return id0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d1334ef..6b383a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -211,7 +211,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
txHnd = new IgniteTxHandler(cctx);
- deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+ deferredAckMsgSnd = new GridDeferredAckMessageSender<GridCacheVersion>(cctx.time(), cctx.kernalContext().closure()) {
@Override public int getTimeout() {
return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
}