You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/04 14:29:41 UTC
[1/3] ignite git commit: Reverted invalid changes.
Repository: ignite
Updated Branches:
refs/heads/ignite-atomic-good-lock-bench a86c4c7d6 -> 78dd3ea42
Reverted invalid changes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d6c6c9b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d6c6c9b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d6c6c9b
Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 2d6c6c9bd4ce2379edd32e8e472d52c183995920
Parents: a86c4c7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 16:10:40 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 16:10:40 2016 +0300
----------------------------------------------------------------------
.../internal/processors/cache/GridCacheIoManager.java | 6 +++---
.../distributed/dht/atomic/GridDhtAtomicCache.java | 12 ++++++------
2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d6c6c9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 6cb3510..b297827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -281,7 +281,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage> c) {
- //rw.readLock();
+ rw.readLock();
try {
if (stopping) {
@@ -312,7 +312,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled)
cctx.deploy().ignoreOwnership(false);
- //rw.readUnlock();
+ rw.readUnlock();
}
}
@@ -619,7 +619,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
cctx.mvcc().contextReset();
// Unwind eviction notifications.
- //CU.unwindEvicts(cctx);
+ CU.unwindEvicts(cctx);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d6c6c9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2ca54c3..e908c05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2611,12 +2611,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Optimization.
return;
-// // Must touch all entries since update may have deleted entries.
-// // Eviction manager will remove empty entries.
-// for (GridCacheMapEntry entry : locked) {
-// if (entry != null && (skip == null || !skip.contains(entry.key())))
-// ctx.evicts().touch(entry, topVer);
-// }
+ // Must touch all entries since update may have deleted entries.
+ // Eviction manager will remove empty entries.
+ for (GridCacheMapEntry entry : locked) {
+ if (entry != null && (skip == null || !skip.contains(entry.key())))
+ ctx.evicts().touch(entry, topVer);
+ }
}
/**
[3/3] ignite git commit: Spin-lock on hot cache IO manager path.
Posted by vo...@apache.org.
Spin-lock on hot cache IO manager path.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/78dd3ea4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/78dd3ea4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/78dd3ea4
Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 78dd3ea427fad9cad334443b3f6dfbb6d4b07a51
Parents: 673daf4
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 16:29:26 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 16:29:26 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 85 ++++++--------------
.../internal/util/GridStripedSpinBusyLock.java | 74 ++++++++++++-----
2 files changed, 80 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/78dd3ea4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b297827..0cd34be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -60,7 +59,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
@@ -102,11 +101,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
new ConcurrentHashMap8<>();
- /** Stopping flag. */
- private boolean stopping;
-
/** Mutex. */
- private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+ private final GridStripedSpinBusyLock lock = new GridStripedSpinBusyLock();
/** Deployment enabled. */
private boolean depEnabled;
@@ -245,32 +241,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
for (Object ordTopic : orderedHandlers.keySet())
cctx.gridIO().removeMessageListener(ordTopic);
- boolean interrupted = false;
-
- // Busy wait is intentional.
- while (true) {
- try {
- if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
- break;
- else
- Thread.sleep(200);
- }
- catch (InterruptedException ignore) {
- // Preserve interrupt status & ignore.
- // Note that interrupted flag is cleared.
- interrupted = true;
- }
- }
-
- if (interrupted)
- Thread.currentThread().interrupt();
-
- try {
- stopping = true;
- }
- finally {
- rw.writeUnlock();
- }
+ lock.block();
}
/**
@@ -281,39 +252,35 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage> c) {
- rw.readLock();
+ if (lock.enterBusy()) {
+ try {
+ if (depEnabled)
+ cctx.deploy().ignoreOwnership(true);
- try {
- if (stopping) {
- if (log.isDebugEnabled())
- log.debug("Received cache communication message while stopping (will ignore) [nodeId=" +
- nodeId + ", msg=" + cacheMsg + ']');
+ unmarshall(nodeId, cacheMsg);
- return;
+ if (cacheMsg.classError() != null)
+ processFailedMessage(nodeId, cacheMsg, c);
+ else
+ processMessage(nodeId, cacheMsg, c);
}
+ catch (Throwable e) {
+ U.error(log, "Failed to process message [senderId=" + nodeId +
+ ", messageType=" + cacheMsg.getClass() + ']', e);
- if (depEnabled)
- cctx.deploy().ignoreOwnership(true);
-
- unmarshall(nodeId, cacheMsg);
-
- if (cacheMsg.classError() != null)
- processFailedMessage(nodeId, cacheMsg, c);
- else
- processMessage(nodeId, cacheMsg, c);
- }
- catch (Throwable e) {
- U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- finally {
- if (depEnabled)
- cctx.deploy().ignoreOwnership(false);
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ finally {
+ if (depEnabled)
+ cctx.deploy().ignoreOwnership(false);
- rw.readUnlock();
+ lock.leaveBusy();
+ }
}
+ else if (log.isDebugEnabled())
+ log.debug("Received cache communication message while stopping (will ignore) [nodeId=" + nodeId +
+ ", msg=" + cacheMsg + ']');
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/78dd3ea4/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
index a11b0b1..a7b9da5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
@@ -17,8 +17,11 @@
package org.apache.ignite.internal.util;
-import java.util.Random;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.thread.IgniteThread;
+
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* Striped spin busy lock. Aimed to provide efficient "read" lock semantics while still maintaining safety when
@@ -31,15 +34,21 @@ public class GridStripedSpinBusyLock {
/** Default amount of stripes. */
private static final int DFLT_STRIPE_CNT = Runtime.getRuntime().availableProcessors() * 4;
+ /** Thread index generator. */
+ private static final AtomicInteger THREAD_IDX_GEN = new AtomicInteger();
+
/** Thread index. */
- private static ThreadLocal<Integer> THREAD_IDX = new ThreadLocal<Integer>() {
+ private static final ThreadLocal<Integer> THREAD_IDX = new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
- return new Random().nextInt(Integer.MAX_VALUE);
+ return THREAD_IDX_GEN.incrementAndGet();
}
};
- /** States; they are not subjects to false-sharing because actual values are located far from each other. */
- private final AtomicInteger[] states;
+ /** Amount of stripes. */
+ private final int stripeCnt;
+
+ /** States. */
+ private final AtomicIntegerArray states;
/**
* Default constructor.
@@ -54,10 +63,12 @@ public class GridStripedSpinBusyLock {
* @param stripeCnt Amount of stripes.
*/
public GridStripedSpinBusyLock(int stripeCnt) {
- states = new AtomicInteger[stripeCnt];
+ A.ensure(stripeCnt > 0, "stripeCnt > 0");
+
+ this.stripeCnt = stripeCnt;
- for (int i = 0; i < stripeCnt; i++)
- states[i] = new AtomicInteger();
+ // Each state must be located 64 bytes from the other to avoid false sharing.
+ states = new AtomicIntegerArray(adjusted(stripeCnt));
}
/**
@@ -66,7 +77,7 @@ public class GridStripedSpinBusyLock {
* @return {@code True} if entered busy state.
*/
public boolean enterBusy() {
- int val = state().incrementAndGet();
+ int val = states.incrementAndGet(index());
if ((val & WRITER_MASK) == WRITER_MASK) {
leaveBusy();
@@ -81,7 +92,7 @@ public class GridStripedSpinBusyLock {
* Leave busy state.
*/
public void leaveBusy() {
- state().decrementAndGet();
+ states.decrementAndGet(index());
}
/**
@@ -89,11 +100,13 @@ public class GridStripedSpinBusyLock {
*/
public void block() {
// 1. CAS-loop to set a writer bit.
- for (AtomicInteger state : states) {
+ for (int i = 0; i < stripeCnt; i++) {
+ int idx = adjusted(i);
+
while (true) {
- int oldVal = state.get();
+ int oldVal = states.get(idx);
- if (state.compareAndSet(oldVal, oldVal | WRITER_MASK))
+ if (states.compareAndSet(idx, oldVal, oldVal | WRITER_MASK))
break;
}
}
@@ -101,8 +114,10 @@ public class GridStripedSpinBusyLock {
// 2. Wait until all readers are out.
boolean interrupt = false;
- for (AtomicInteger state : states) {
- while (state.get() != WRITER_MASK) {
+ for (int i = 0; i < stripeCnt; i++) {
+ int idx = adjusted(i);
+
+ while (states.get(idx) != WRITER_MASK) {
try {
Thread.sleep(10);
}
@@ -117,11 +132,30 @@ public class GridStripedSpinBusyLock {
}
/**
- * Gets state of thread's stripe.
+ * Get index for the given thread.
+ *
+ * @return Index for the given thread.
+ */
+ private int index() {
+ Thread t = Thread.currentThread();
+
+ if (t instanceof IgniteThread) {
+ int idx = ((IgniteThread) t).groupIndex();
+
+ if (idx != IgniteThread.GRP_IDX_UNASSIGNED)
+ return idx;
+ }
+
+ return adjusted(THREAD_IDX.get() % stripeCnt);
+ }
+
+ /**
+ * Gets value adjusted for striping.
*
- * @return State.
+ * @param val Value.
+ * @return Value.
*/
- private AtomicInteger state() {
- return states[THREAD_IDX.get() % states.length];
+ private static int adjusted(int val) {
+ return val << 4;
}
-}
+}
\ No newline at end of file
[2/3] ignite git commit: Optimized back-pressure control.
Posted by vo...@apache.org.
Optimized back-pressure control.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/673daf48
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/673daf48
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/673daf48
Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 673daf48b5a394c5a68dcf151cb13e6c09428928
Parents: 2d6c6c9
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 16:11:04 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 16:11:04 2016 +0300
----------------------------------------------------------------------
.../util/nio/GridNioBackPressureControl.java | 16 ++++++++++++++--
.../org/apache/ignite/thread/IgniteThread.java | 17 +++++++++++++++++
2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/673daf48/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
index 96a1ab3..4d69533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.thread.IgniteThread;
+
/**
* Utility class that allows to ignore back-pressure control for threads that are processing messages.
*/
@@ -32,13 +34,23 @@ public class GridNioBackPressureControl {
* @return Flag indicating whether current thread is processing message.
*/
public static boolean threadProcessingMessage() {
- return threadProcMsg.get();
+ Thread t = Thread.currentThread();
+
+ if (t instanceof IgniteThread)
+ return ((IgniteThread)t).processingMessage();
+ else
+ return threadProcMsg.get();
}
/**
* @param processing Flag indicating whether current thread is processing message.
*/
public static void threadProcessingMessage(boolean processing) {
- threadProcMsg.set(processing);
+ Thread t = Thread.currentThread();
+
+ if (t instanceof IgniteThread)
+ ((IgniteThread)t).processingMessage(processing);
+ else
+ threadProcMsg.set(processing);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/673daf48/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 03ed589..c7a3790 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -48,6 +48,9 @@ public class IgniteThread extends Thread {
/** Group index. */
private final int grpIdx;
+ /** Message processing flag. */
+ private boolean procMsg;
+
/**
* Creates thread with given worker.
*
@@ -126,6 +129,20 @@ public class IgniteThread extends Thread {
}
/**
+ * @return Message processing flag.
+ */
+ public boolean processingMessage() {
+ return procMsg;
+ }
+
+ /**
+ * @param procMsg Message processing flag.
+ */
+ public void processingMessage(boolean procMsg) {
+ this.procMsg = procMsg;
+ }
+
+ /**
* Creates new thread name.
*
* @param num Thread number.