You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:52 UTC
[07/10] ignite git commit: tmp
tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f518395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f518395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f518395
Branch: refs/heads/ignite-4680-sb
Commit: 5f51839525a839c1eec9c28aa7772cc9f1bc59c1
Parents: 3e7ee08
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 15:54:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 15:54:56 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 33 ++++++++++++++++++++
.../dht/atomic/GridDhtAtomicCache.java | 15 ++++++---
.../atomic/GridNearAtomicFullUpdateRequest.java | 1 +
.../apache/ignite/internal/util/MPSCQueue.java | 8 ++---
4 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 39c514b..6dad30b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -55,11 +55,13 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearAtomicResponseHelper;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
+import org.apache.ignite.internal.util.MPSCQueue;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -201,6 +203,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
};
+ private Thread resThread;
+
+ private MPSCQueue<Runnable> q;
+
/**
* @param ctx Grid kernal context.
*/
@@ -221,6 +227,26 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
synchronized (sysLsnrsMux) {
sysLsnrs = new GridMessageListener[GridTopic.values().length];
}
+
+ resThread = new Thread() {
+ public void run() {
+ while (true) {
+ try {
+ Runnable r = q.take();
+
+ r.run();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ q = new MPSCQueue<>(resThread);
+
+ resThread.setDaemon(true);
+ resThread.start();
}
/**
@@ -823,6 +849,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
+// if (msg.message() instanceof GridNearAtomicUpdateResponse) {
+// q.add(c);
+//
+// return;
+// }
+
+
if (plc == GridIoPolicy.SYSTEM_POOL &&
(msg.partition() != Integer.MIN_VALUE ||
msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 973256f..dcc79d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1761,6 +1761,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
+ private GridCacheVersion ver;
+
+
/**
* Executes local update after preloader fetched values.
*
@@ -1837,13 +1840,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
// Assign next version for update inside entries lock.
- GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+ if (ver == null)
+ ver = ctx.versions().next(top.topologyVersion());
if (hasNear)
res.nearVersion(ver);
@@ -1859,7 +1863,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
- dhtFut = createDhtFuture(ver, req, size);
+ dhtFut = null;//createDhtFuture(ver, req, size);
expiry = expiryPolicy(req.expiry());
@@ -1977,9 +1981,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
- else
+ else {
if (dhtFut != null)
dhtFut.map(node, res.returnValue(), res, completionCb);
+ else
+ completionCb.apply(req, res);
+ }
if (req.writeSynchronizationMode() != FULL_ASYNC)
req.cleanup(!node.isLocal());
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 2e619ee..ce6035e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -406,6 +406,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override @Nullable public Map<Integer, int[]> stripeMap() {
+ //stripeMap = null;
return stripeMap;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
index 5505b3a..5725390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
@@ -41,7 +41,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
/** */
final AtomicReference<Node> putStack = new AtomicReference<>();
/** */
- private final AtomicInteger takeStackSize = new AtomicInteger();
+ //private final AtomicInteger takeStackSize = new AtomicInteger();
/** */
private Thread consumerThread;
@@ -189,7 +189,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
private void dequeue() {
takeStack[takeStackIndex] = null;
takeStackIndex++;
- takeStackSize.lazySet(takeStackSize.get() - 1);
+ //takeStackSize.lazySet(takeStackSize.get() - 1);
}
/** */
@@ -248,7 +248,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
private void copyIntoTakeStack(Node putStackHead) {
int putStackSize = putStackHead.size;
- takeStackSize.lazySet(putStackSize);
+ //takeStackSize.lazySet(putStackSize);
if (putStackSize > takeStack.length)
takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
@@ -270,7 +270,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
@Override public int size() {
Node h = putStack.get();
int putStackSize = h == null ? 0 : h.size;
- return putStackSize + takeStackSize.get();
+ return putStackSize + 0;//takeStackSize.get();
}
/** {@inheritDoc}. */