You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2016/12/10 09:27:14 UTC
ignite git commit: applied striped rw lock
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-balance-master 3b0c302e4 -> 131340f27
applied striped rw lock
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/131340f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/131340f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/131340f2
Branch: refs/heads/ignite-comm-balance-master
Commit: 131340f279e537b2fe83588b67673828578ca320
Parents: 3b0c302
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Sat Dec 10 16:26:58 2016 +0700
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Sat Dec 10 16:26:58 2016 +0700
----------------------------------------------------------------------
.../ignite/internal/GridKernalGatewayImpl.java | 25 ++++++----
.../managers/communication/GridIoManager.java | 22 +++++----
.../processors/cache/GridCacheGateway.java | 48 ++++++++++---------
.../processors/cache/GridCacheIoManager.java | 16 ++++---
.../util/StripedCompositeReadWriteLock.java | 50 ++++++++++++++++----
.../org/apache/ignite/thread/IgniteThread.java | 26 ++++++++--
6 files changed, 129 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/131340f2/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index fe8c580..7cbf84a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -22,9 +22,11 @@ import java.io.Serializable;
import java.io.StringWriter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -40,7 +42,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** */
@GridToStringExclude
- private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
+ private final ReadWriteLock rwLock =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** */
@GridToStringExclude
@@ -73,13 +76,15 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
if (stackTrace == null)
stackTrace = stackTrace();
- rwLock.readLock();
+ Lock lock = rwLock.readLock();
+
+ lock.lock();
GridKernalState state = this.state.get();
if (state != GridKernalState.STARTED) {
// Unlock just acquired lock.
- rwLock.readUnlock();
+ lock.unlock();
if (state == GridKernalState.DISCONNECTED) {
assert reconnectFut != null;
@@ -96,7 +101,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
if (stackTrace == null)
stackTrace = stackTrace();
- rwLock.readLock();
+ rwLock.readLock().lock();
if (state.get() == GridKernalState.DISCONNECTED)
throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
@@ -104,7 +109,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public void readUnlock() {
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
}
/** {@inheritDoc} */
@@ -118,7 +123,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
// Busy wait is intentional.
while (true)
try {
- if (rwLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ if (rwLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
@@ -135,7 +140,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public boolean tryWriteLock(long timeout) throws InterruptedException {
- boolean acquired = rwLock.tryWriteLock(timeout, TimeUnit.MILLISECONDS);
+ boolean acquired = rwLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);
if (acquired) {
if (stackTrace == null)
@@ -194,7 +199,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public void writeUnlock() {
- rwLock.writeUnlock();
+ rwLock.writeLock().unlock();
}
/** {@inheritDoc} */
@@ -222,4 +227,4 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
@Override public String toString() {
return S.toString(GridKernalGatewayImpl.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/131340f2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 7ef7bc0..0703a3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
@@ -57,7 +58,7 @@ import org.apache.ignite.internal.processors.platform.message.PlatformMessageFil
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -160,7 +161,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final Marshaller marsh;
/** Busy lock. */
- private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+ private final ReadWriteLock busyLock =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** Lock to sync maps access. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -577,7 +579,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// Busy wait is intentional.
while (true) {
try {
- if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ if (busyLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
@@ -601,7 +603,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
stopping = true;
}
finally {
- busyLock.writeUnlock();
+ busyLock.writeLock().unlock();
}
}
@@ -623,7 +625,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert nodeId != null;
assert msg != null;
- busyLock.readLock();
+ Lock busyLock0 = busyLock.readLock();
+
+ busyLock0.lock();
try {
if (stopping) {
@@ -712,7 +716,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
U.error(log, "Failed to process message (will ignore): " + msg, e);
}
finally {
- busyLock.readUnlock();
+ busyLock0.unlock();
}
}
@@ -2162,7 +2166,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- busyLock.readLock();
+ Lock lock = busyLock.readLock();
+
+ lock.lock();
try {
if (stopping) {
@@ -2240,7 +2246,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
}
finally {
- busyLock.readUnlock();
+ lock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/131340f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 1562d70..1bf9468 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -17,13 +17,15 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -45,7 +47,8 @@ public class GridCacheGateway<K, V> {
private IgniteFuture<?> reconnectFut;
/** */
- private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
+ private StripedCompositeReadWriteLock rwLock =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/**
* @param ctx Cache context.
@@ -63,7 +66,7 @@ public class GridCacheGateway<K, V> {
if (ctx.deploymentEnabled())
ctx.deploy().onEnter();
- rwLock.readLock();
+ rwLock.readLock().lock();
checkState(true, true);
}
@@ -78,7 +81,7 @@ public class GridCacheGateway<K, V> {
if (state != State.STARTED) {
if (lock)
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
if (state == State.STOPPED) {
if (stopErr)
@@ -106,7 +109,7 @@ public class GridCacheGateway<K, V> {
onEnter();
// Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
- rwLock.readLock();
+ rwLock.readLock().lock();
return checkState(true, false);
}
@@ -139,10 +142,10 @@ public class GridCacheGateway<K, V> {
*/
public void leave() {
try {
- leaveNoLock();
+ leaveNoLock();
}
finally {
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
}
}
@@ -168,7 +171,9 @@ public class GridCacheGateway<K, V> {
onEnter();
- rwLock.readLock();
+ Lock lock = rwLock.readLock();
+
+ lock.lock();
checkState(true, true);
@@ -178,7 +183,7 @@ public class GridCacheGateway<K, V> {
return setOperationContextPerCall(opCtx);
}
catch (Throwable e) {
- rwLock.readUnlock();
+ lock.unlock();
throw e;
}
@@ -219,7 +224,7 @@ public class GridCacheGateway<K, V> {
leaveNoLock(prev);
}
finally {
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
}
}
@@ -269,14 +274,14 @@ public class GridCacheGateway<K, V> {
*
*/
public void writeLock(){
- rwLock.writeLock();
+ rwLock.writeLock().lock();
}
/**
*
*/
public void writeUnlock() {
- rwLock.writeUnlock();
+ rwLock.writeLock().unlock();
}
/**
@@ -295,15 +300,14 @@ public class GridCacheGateway<K, V> {
boolean interrupted = false;
while (true) {
- if (rwLock.tryWriteLock())
- break;
- else {
- try {
+ try {
+ if (rwLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
+ break;
+ else
U.sleep(200);
- }
- catch (IgniteInterruptedCheckedException ignore) {
- interrupted = true;
- }
+ }
+ catch (IgniteInterruptedCheckedException | InterruptedException ignore) {
+ interrupted = true;
}
}
@@ -314,7 +318,7 @@ public class GridCacheGateway<K, V> {
state.set(State.STOPPED);
}
finally {
- rwLock.writeUnlock();
+ rwLock.writeLock().unlock();
}
}
@@ -331,4 +335,4 @@ public class GridCacheGateway<K, V> {
/** */
STOPPED
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/131340f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 924ce79..3975f92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -74,7 +75,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAwa
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
@@ -120,7 +121,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
private boolean stopping;
/** Mutex. */
- private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+ private final StripedCompositeReadWriteLock rw =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** Deployment enabled. */
private boolean depEnabled;
@@ -316,7 +318,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
// Busy wait is intentional.
while (true) {
try {
- if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ if (rw.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
@@ -335,7 +337,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
stopping = true;
}
finally {
- rw.writeUnlock();
+ rw.writeLock().unlock();
}
}
@@ -347,7 +349,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage> c) {
- rw.readLock();
+ Lock lock = rw.readLock();
+
+ lock.lock();
try {
if (stopping) {
@@ -378,7 +382,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled)
cctx.deploy().ignoreOwnership(false);
- rw.readUnlock();
+ lock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/131340f2/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
index 12940e6..e215663 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
@@ -17,15 +17,14 @@
package org.apache.ignite.internal.util;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.NotNull;
-
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
/**
* ReadWriteLock with striping mechanics.
@@ -69,10 +68,15 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock {
int idx;
if (Thread.currentThread() instanceof IgniteThread) {
- idx = ((IgniteThread)Thread.currentThread()).groupIndex();
+ IgniteThread igniteThread = (IgniteThread)Thread.currentThread();
+
+ idx = igniteThread.compositeRwLockIndex();
+
+ if (idx == IgniteThread.GRP_IDX_UNASSIGNED) {
+ idx = IDX_GEN.incrementAndGet();
- if (idx == IgniteThread.GRP_IDX_UNASSIGNED)
- idx = IDX.get();
+ igniteThread.compositeRwLockIndex(idx);
+ }
}
else
idx = IDX.get();
@@ -135,7 +139,7 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock {
* Internal lock routine.
*
* @param canInterrupt Whether to acquire the lock interruptibly.
- * @throws InterruptedException
+ * @throws InterruptedException If interrupted.
*/
private void lock0(boolean canInterrupt) throws InterruptedException {
int i = 0;
@@ -167,13 +171,41 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock {
/** {@inheritDoc} */
@Override public boolean tryLock() {
- throw new UnsupportedOperationException();
+ int i = 0;
+
+ try {
+ for (; i < locks.length; i++) {
+ if (!locks[i].writeLock().tryLock())
+ break;
+ }
+ }
+ finally {
+ if (0 < i && i < locks.length)
+ unlock0(i - 1);
+ }
+
+ return i == locks.length;
}
/** {@inheritDoc} */
@SuppressWarnings("NullableProblems")
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- throw new UnsupportedOperationException();
+ int i = 0;
+
+ long end = unit.toNanos(time) + System.nanoTime();
+
+ try {
+ for (; i < locks.length && System.nanoTime() < end; i++) {
+ if (!locks[i].writeLock().tryLock(time, unit))
+ break;
+ }
+ }
+ finally {
+ if (0 < i && i < locks.length)
+ unlock0(i - 1);
+ }
+
+ return i == locks.length;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/131340f2/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 03ed589..0f987e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -18,6 +18,7 @@
package org.apache.ignite.thread;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -48,6 +49,9 @@ public class IgniteThread extends Thread {
/** Group index. */
private final int grpIdx;
+ /** */
+ private int compositeRwLockIdx;
+
/**
* Creates thread with given worker.
*
@@ -93,8 +97,10 @@ public class IgniteThread extends Thread {
public IgniteThread(ThreadGroup grp, String gridName, String threadName, Runnable r, int grpIdx) {
super(grp, r, createName(cntr.incrementAndGet(), threadName, gridName));
+ A.ensure(grpIdx >= -1, "grpIdx >= -1");
+
this.gridName = gridName;
- this.grpIdx = grpIdx;
+ this.grpIdx = compositeRwLockIdx = grpIdx;
}
/**
@@ -106,7 +112,7 @@ public class IgniteThread extends Thread {
super(threadGrp, threadName);
this.gridName = gridName;
- this.grpIdx = GRP_IDX_UNASSIGNED;
+ this.grpIdx = compositeRwLockIdx = GRP_IDX_UNASSIGNED;
}
/**
@@ -126,6 +132,20 @@ public class IgniteThread extends Thread {
}
/**
+ * @return Composite RW lock index.
+ */
+ public int compositeRwLockIndex() {
+ return compositeRwLockIdx;
+ }
+
+ /**
+ * @param compositeRwLockIdx Composite RW lock index.
+ */
+ public void compositeRwLockIndex(int compositeRwLockIdx) {
+ this.compositeRwLockIdx = compositeRwLockIdx;
+ }
+
+ /**
* Creates new thread name.
*
* @param num Thread number.
@@ -141,4 +161,4 @@ public class IgniteThread extends Thread {
@Override public String toString() {
return S.toString(IgniteThread.class, this, "name", getName());
}
-}
\ No newline at end of file
+}