You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/13 11:15:20 UTC
[2/2] ignite git commit: Ignite-perftest - Prepare optimizations for
merge.
Ignite-perftest - Prepare optimizations for merge.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e58604a4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e58604a4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e58604a4
Branch: refs/heads/ignite-perftest-merge
Commit: e58604a4aa6c2c0ec9a756ac40a5aae4af5621bc
Parents: d12674a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Nov 13 13:15:05 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Nov 13 13:15:05 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalGatewayImpl.java | 8 +-
.../managers/communication/GridIoManager.java | 52 +-
.../processors/cache/GridCacheGateway.java | 25 +-
.../processors/cache/GridCacheIoManager.java | 34 +-
.../processors/cache/GridCacheMapEntry.java | 16 +-
.../processors/cache/GridCacheMvcc.java | 8 -
.../distributed/dht/GridDhtLockFuture.java | 3 +-
.../dht/GridDhtPartitionTopologyImpl.java | 3 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 2 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 3 +-
...arOptimisticSerializableTxPrepareFuture.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 12 -
.../clock/GridClockSyncProcessor.java | 2 +-
.../internal/util/GridSpinReadWriteLock.java | 522 +++++++++----------
.../ignite/internal/util/nio/GridNioServer.java | 12 +-
.../util/nio/GridSelectorNioSessionImpl.java | 42 +-
16 files changed, 356 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index dbf2f73..fe8c580 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -73,13 +73,13 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
if (stackTrace == null)
stackTrace = stackTrace();
-// rwLock.readLock();
+ rwLock.readLock();
GridKernalState state = this.state.get();
if (state != GridKernalState.STARTED) {
// Unlock just acquired lock.
-// rwLock.readUnlock();
+ rwLock.readUnlock();
if (state == GridKernalState.DISCONNECTED) {
assert reconnectFut != null;
@@ -96,7 +96,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
if (stackTrace == null)
stackTrace = stackTrace();
-// rwLock.readLock();
+ rwLock.readLock();
if (state.get() == GridKernalState.DISCONNECTED)
throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
@@ -104,7 +104,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public void readUnlock() {
-// rwLock.readUnlock();
+ rwLock.readUnlock();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a14a05a..b8af8da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -167,10 +167,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final Marshaller marsh;
/** Busy lock. */
-// private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
/** Lock to sync maps access. */
-// private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
/** Fully started flag. When set to true, can send and receive messages. */
private volatile boolean started;
@@ -396,7 +396,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
// Clean up delayed and ordered messages (need exclusive lock).
-// lock.writeLock().lock();
+ lock.writeLock().lock();
try {
ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId);
@@ -406,7 +406,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
"(sender node left topology): " + waitList);
}
finally {
-// lock.writeLock().unlock();
+ lock.writeLock().unlock();
}
break;
@@ -424,7 +424,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// 1. Process wait list.
Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>();
-// lock.writeLock().lock();
+ lock.writeLock().lock();
try {
started = true;
@@ -442,7 +442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
}
finally {
-// lock.writeLock().unlock();
+ lock.writeLock().unlock();
}
// After write lock released.
@@ -501,19 +501,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
boolean interrupted = false;
// Busy wait is intentional.
-// while (true) {
-// try {
-// if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
-// break;
-// else
-// Thread.sleep(200);
-// }
-// catch (InterruptedException ignore) {
-// // Preserve interrupt status & ignore.
-// // Note that interrupted flag is cleared.
-// interrupted = true;
-// }
-// }
+ while (true) {
+ try {
+ if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ break;
+ else
+ Thread.sleep(200);
+ }
+ catch (InterruptedException ignore) {
+ // Preserve interrupt status & ignore.
+ // Note that interrupted flag is cleared.
+ interrupted = true;
+ }
+ }
try {
if (interrupted)
@@ -529,7 +529,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
stopping = true;
}
finally {
-// busyLock.writeUnlock();
+ busyLock.writeUnlock();
}
}
@@ -553,7 +553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert nodeId != null;
assert msg != null;
-// busyLock.readLock();
+ busyLock.readLock();
try {
if (stopping) {
@@ -581,7 +581,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
if (!started) {
-// lock.readLock().lock();
+ lock.readLock().lock();
try {
if (!started) { // Sets to true in write lock, so double checking.
@@ -601,7 +601,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
}
finally {
-// lock.readLock().unlock();
+ lock.readLock().unlock();
}
}
@@ -649,7 +649,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
U.error(log, "Failed to process message (will ignore): " + msg, e);
}
finally {
-// busyLock.readUnlock();
+ busyLock.readUnlock();
}
}
@@ -2001,7 +2001,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
-// busyLock.readLock();
+ busyLock.readLock();
try {
if (stopping) {
@@ -2077,7 +2077,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
}
finally {
-// busyLock.readUnlock();
+ busyLock.readUnlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 8a1f0c3..1562d70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -63,9 +63,9 @@ public class GridCacheGateway<K, V> {
if (ctx.deploymentEnabled())
ctx.deploy().onEnter();
-// rwLock.readLock();
+ rwLock.readLock();
-// checkState(true, true);
+ checkState(true, true);
}
/**
@@ -106,11 +106,9 @@ public class GridCacheGateway<K, V> {
onEnter();
// Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
-// rwLock.readLock();
-//
-// return checkState(true, false);
+ rwLock.readLock();
- return true;
+ return checkState(true, false);
}
/**
@@ -121,8 +119,7 @@ public class GridCacheGateway<K, V> {
public boolean enterIfNotStoppedNoLock() {
onEnter();
-// return checkState(false, false);
- return true;
+ return checkState(false, false);
}
/**
@@ -145,7 +142,7 @@ public class GridCacheGateway<K, V> {
leaveNoLock();
}
finally {
-// rwLock.readUnlock();
+ rwLock.readUnlock();
}
}
@@ -171,9 +168,9 @@ public class GridCacheGateway<K, V> {
onEnter();
-// rwLock.readLock();
-//
-// checkState(true, true);
+ rwLock.readLock();
+
+ checkState(true, true);
// Must unlock in case of unexpected errors to avoid
// deadlocks during kernal stop.
@@ -181,7 +178,7 @@ public class GridCacheGateway<K, V> {
return setOperationContextPerCall(opCtx);
}
catch (Throwable e) {
-// rwLock.readUnlock();
+ rwLock.readUnlock();
throw e;
}
@@ -222,7 +219,7 @@ public class GridCacheGateway<K, V> {
leaveNoLock(prev);
}
finally {
-// rwLock.readUnlock();
+ rwLock.readUnlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 1a118a7..2334780 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -101,7 +101,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
private boolean stopping;
/** Mutex. */
-// private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+ private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
/** Deployment enabled. */
private boolean depEnabled;
@@ -218,19 +218,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
boolean interrupted = false;
// Busy wait is intentional.
-// while (true) {
-// try {
-// if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
-// break;
-// else
-// Thread.sleep(200);
-// }
-// catch (InterruptedException ignore) {
-// // Preserve interrupt status & ignore.
-// // Note that interrupted flag is cleared.
-// interrupted = true;
-// }
-// }
+ while (true) {
+ try {
+ if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ break;
+ else
+ Thread.sleep(200);
+ }
+ catch (InterruptedException ignore) {
+ // Preserve interrupt status & ignore.
+ // Note that interrupted flag is cleared.
+ interrupted = true;
+ }
+ }
if (interrupted)
Thread.currentThread().interrupt();
@@ -239,7 +239,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
stopping = true;
}
finally {
-// rw.writeUnlock();
+ rw.writeUnlock();
}
}
@@ -251,7 +251,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage> c) {
-// rw.readLock();
+ rw.readLock();
try {
if (stopping) {
@@ -282,7 +282,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled)
cctx.deploy().ignoreOwnership(false);
-// rw.readUnlock();
+ rw.readUnlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b40ab6a..df9f5c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -298,17 +298,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject val0 = val;
- if (val0 == null) {
- if (hasOffHeapPointer()) {
- IgniteBiTuple<byte[], Byte> t = valueBytes0();
+ if (val0 == null && hasOffHeapPointer()) {
+ IgniteBiTuple<byte[], Byte> t = valueBytes0();
- return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
- }
- }
- else if (val0 instanceof CacheObjectImpl) {
- CacheObjectImpl im = (CacheObjectImpl)val0;
-
- val0 = new CacheObjectImpl(im.val, im.valBytes);
+ return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
}
return val0;
@@ -2854,8 +2847,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** {@inheritDoc} */
@Override public KeyCacheObject key() {
-// return key;
- return new KeyCacheObjectImpl(((KeyCacheObjectImpl)key).val, ((KeyCacheObjectImpl)key).valBytes);
+ return key;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 543923a..adcbf92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -1370,12 +1370,4 @@ public final class GridCacheMvcc {
@Override public String toString() { // Synchronize to ensure one-thread at a time.
return S.toString(GridCacheMvcc.class, this);
}
-
- public static void main(String[] args) {
- ArrayList<String> col1 = new ArrayList<>(5);
-
- for (int i = 0; i < 5; i++) {
- col1.add("" + i);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index d86a11d..219d841 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -380,10 +380,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @return Lock candidate.
* @throws GridCacheEntryRemovedException If entry was removed.
* @throws GridDistributedLockCancelledException If lock is canceled.
- * @throws IgniteCheckedException If failed.
*/
@Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry)
- throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException {
+ throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
if (log.isDebugEnabled())
log.debug("Adding entry: " + entry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0afe70b..6bd283a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -686,9 +686,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
@Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
- if (true)
- return affNodes;
-
lock.readLock().lock();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index dbe69f8..e2939b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -985,7 +985,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
}
}
- catch (GridCacheEntryRemovedException e) {
+ catch (GridCacheEntryRemovedException ignore) {
assert false : "Got removed exception on entry with dht local candidate: " + entries;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 1050086..e268a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
@@ -209,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
this.rmtFutId = rmtFutId;
readMap = Collections.emptyMap();
- writeMap = new LinkedHashMap<>(U.capacity(txSize), 0.75f);
+ writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
topologyVersion(topVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 1ca90dd..9c6cb88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -389,9 +389,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- MiniFuture fut = new MiniFuture(m);
-
- add(fut);
+ add(new MiniFuture(m));
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 278f6df..c88546b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -482,18 +482,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/**
- * @param nodeId Node ID.
- * @param dhtVer DHT version.
- * @param writeVer Write version.
- */
- void addDhtVersion(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion writeVer) {
- GridDistributedTxMapping m = mappings.get(nodeId);
-
- if (m != null)
- m.dhtVersion(dhtVer, writeVer);
- }
-
- /**
* @param nodeId Undo mapping.
*/
@Override public boolean removeMapping(UUID nodeId) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index a33c35e..b5c89cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -289,7 +289,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
long now = clockSrc.currentTimeMillis();
- if (snap == null)
+ if (snap == null)
return now;
Long delta = snap.deltas().get(ctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 115fd80..a1fa892 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -78,44 +78,44 @@ public class GridSpinReadWriteLock {
*/
@SuppressWarnings("BusyWait")
public void readLock() {
-// int cnt = readLockEntryCnt.get();
-//
-// // Read lock reentry or acquiring read lock while holding write lock.
-// if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
-// assert state > 0 || state == -1;
-//
-// readLockEntryCnt.set(cnt + 1);
-//
-// return;
-// }
-//
-// boolean interrupted = false;
-//
-// while (true) {
-// int cur = state;
-//
-// assert cur >= -1;
-//
-// if (cur == -1 || pendingWLocks > 0) {
-// try {
-// Thread.sleep(10);
-// }
-// catch (InterruptedException ignored) {
-// interrupted = true;
-// }
-//
-// continue;
-// }
-//
-// if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
-// if (interrupted)
-// Thread.currentThread().interrupt();
-//
-// break;
-// }
-// }
-//
-// readLockEntryCnt.set(1);
+ int cnt = readLockEntryCnt.get();
+
+ // Read lock reentry or acquiring read lock while holding write lock.
+ if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+ assert state > 0 || state == -1;
+
+ readLockEntryCnt.set(cnt + 1);
+
+ return;
+ }
+
+ boolean interrupted = false;
+
+ while (true) {
+ int cur = state;
+
+ assert cur >= -1;
+
+ if (cur == -1 || pendingWLocks > 0) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+
+ continue;
+ }
+
+ if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+ if (interrupted)
+ Thread.currentThread().interrupt();
+
+ break;
+ }
+ }
+
+ readLockEntryCnt.set(1);
}
/**
@@ -124,62 +124,60 @@ public class GridSpinReadWriteLock {
* @return {@code true} if acquired.
*/
public boolean tryReadLock() {
-// int cnt = readLockEntryCnt.get();
-//
-// // Read lock reentry or acquiring read lock while holding write lock.
-// if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
-// assert state > 0 || state == -1;
-//
-// readLockEntryCnt.set(cnt + 1);
-//
-// return true;
-// }
-//
-// while (true) {
-// int cur = state;
-//
-// if (cur == -1 || pendingWLocks > 0)
-// return false;
-//
-// if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
-// readLockEntryCnt.set(1);
-//
-// return true;
-// }
-// }
-
- return true;
+ int cnt = readLockEntryCnt.get();
+
+ // Read lock reentry or acquiring read lock while holding write lock.
+ if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+ assert state > 0 || state == -1;
+
+ readLockEntryCnt.set(cnt + 1);
+
+ return true;
+ }
+
+ while (true) {
+ int cur = state;
+
+ if (cur == -1 || pendingWLocks > 0)
+ return false;
+
+ if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+ readLockEntryCnt.set(1);
+
+ return true;
+ }
+ }
}
/**
* Read unlock.
*/
public void readUnlock() {
-// int cnt = readLockEntryCnt.get();
-//
-// if (cnt == 0)
-// throw new IllegalMonitorStateException();
-//
-// // Read unlock when holding write lock is performed here.
-// if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) {
-// assert state > 0 || state == -1;
-//
-// readLockEntryCnt.set(cnt - 1);
-//
-// return;
-// }
-//
-// while (true) {
-// int cur = state;
-//
-// assert cur > 0;
-//
-// if (compareAndSet(STATE_OFFS, cur, cur - 1)) {
-// readLockEntryCnt.set(0);
-//
-// return;
-// }
-// }
+ int cnt = readLockEntryCnt.get();
+
+ if (cnt == 0)
+ throw new IllegalMonitorStateException();
+
+ // Read unlock when holding write lock is performed here.
+ if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) {
+ assert state > 0 || state == -1;
+
+ readLockEntryCnt.set(cnt - 1);
+
+ return;
+ }
+
+ while (true) {
+ int cur = state;
+
+ assert cur > 0;
+
+ if (compareAndSet(STATE_OFFS, cur, cur - 1)) {
+ readLockEntryCnt.set(0);
+
+ return;
+ }
+ }
}
/**
@@ -187,95 +185,95 @@ public class GridSpinReadWriteLock {
*/
@SuppressWarnings("BusyWait")
public void writeLock() {
-// long threadId = Thread.currentThread().getId();
-//
-// if (threadId == writeLockOwner) {
-// assert state == -1;
-//
-// writeLockEntryCnt++;
-//
-// return;
-// }
-//
-// // Increment pending write locks.
-// while (true) {
-// int pendingWLocks0 = pendingWLocks;
-//
-// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
-// break;
-// }
-//
-// boolean interrupted = false;
-//
-// while (!compareAndSet(STATE_OFFS, 0, -1)) {
-// try {
-// Thread.sleep(10);
-// }
-// catch (InterruptedException ignored) {
-// interrupted = true;
-// }
-// }
-//
-// // Decrement pending write locks.
-// while (true) {
-// int pendingWLocks0 = pendingWLocks;
-//
-// assert pendingWLocks0 > 0;
-//
-// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
-// break;
-// }
-//
-// if (interrupted)
-// Thread.currentThread().interrupt();
-//
-// assert writeLockOwner == -1;
-//
-// writeLockOwner = threadId;
-// writeLockEntryCnt = 1;
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return;
+ }
+
+ // Increment pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+ break;
+ }
+
+ boolean interrupted = false;
+
+ while (!compareAndSet(STATE_OFFS, 0, -1)) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+
+ // Decrement pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ assert pendingWLocks0 > 0;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+ break;
+ }
+
+ if (interrupted)
+ Thread.currentThread().interrupt();
+
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
}
/**
* Acquires write lock without sleeping between unsuccessful attempts.
*/
public void writeLock0() {
-// long threadId = Thread.currentThread().getId();
-//
-// if (threadId == writeLockOwner) {
-// assert state == -1;
-//
-// writeLockEntryCnt++;
-//
-// return;
-// }
-//
-// // Increment pending write locks.
-// while (true) {
-// int pendingWLocks0 = pendingWLocks;
-//
-// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
-// break;
-// }
-//
-// for (;;) {
-// if (compareAndSet(STATE_OFFS, 0, -1))
-// break;
-// }
-//
-// // Decrement pending write locks.
-// while (true) {
-// int pendingWLocks0 = pendingWLocks;
-//
-// assert pendingWLocks0 > 0;
-//
-// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
-// break;
-// }
-//
-// assert writeLockOwner == -1;
-//
-// writeLockOwner = threadId;
-// writeLockEntryCnt = 1;
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return;
+ }
+
+ // Increment pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+ break;
+ }
+
+ for (;;) {
+ if (compareAndSet(STATE_OFFS, 0, -1))
+ break;
+ }
+
+ // Decrement pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ assert pendingWLocks0 > 0;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+ break;
+ }
+
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
}
/**
@@ -291,28 +289,26 @@ public class GridSpinReadWriteLock {
* @return {@code True} if write lock has been acquired.
*/
public boolean tryWriteLock() {
-// long threadId = Thread.currentThread().getId();
-//
-// if (threadId == writeLockOwner) {
-// assert state == -1;
-//
-// writeLockEntryCnt++;
-//
-// return true;
-// }
-//
-// if (compareAndSet(STATE_OFFS, 0, -1)) {
-// assert writeLockOwner == -1;
-//
-// writeLockOwner = threadId;
-// writeLockEntryCnt = 1;
-//
-// return true;
-// }
-//
-// return false;
-
- return true;
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return true;
+ }
+
+ if (compareAndSet(STATE_OFFS, 0, -1)) {
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
+
+ return true;
+ }
+
+ return false;
}
/**
@@ -323,83 +319,81 @@ public class GridSpinReadWriteLock {
*/
@SuppressWarnings("BusyWait")
public boolean tryWriteLock(long timeout, TimeUnit unit) throws InterruptedException {
-// long threadId = Thread.currentThread().getId();
-//
-// if (threadId == writeLockOwner) {
-// assert state == -1;
-//
-// writeLockEntryCnt++;
-//
-// return true;
-// }
-//
-// try {
-// // Increment pending write locks.
-// while (true) {
-// int pendingWLocks0 = pendingWLocks;
-//
-// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
-// break;
-// }
-//
-// long end = U.currentTimeMillis() + unit.toMillis(timeout);
-//
-// while (true) {
-// if (compareAndSet(STATE_OFFS, 0, -1)) {
-// assert writeLockOwner == -1;
-//
-// writeLockOwner = threadId;
-// writeLockEntryCnt = 1;
-//
-// return true;
-// }
-//
-// Thread.sleep(10);
-//
-// if (end <= U.currentTimeMillis())
-// return false;
-// }
-// }
-// finally {
-// // Decrement pending write locks.
-// while (true) {
-// int pendingWLocks0 = pendingWLocks;
-//
-// assert pendingWLocks0 > 0;
-//
-// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
-// break;
-// }
-// }
-
- return true;
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return true;
+ }
+
+ try {
+ // Increment pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+ break;
+ }
+
+ long end = U.currentTimeMillis() + unit.toMillis(timeout);
+
+ while (true) {
+ if (compareAndSet(STATE_OFFS, 0, -1)) {
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
+
+ return true;
+ }
+
+ Thread.sleep(10);
+
+ if (end <= U.currentTimeMillis())
+ return false;
+ }
+ }
+ finally {
+ // Decrement pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ assert pendingWLocks0 > 0;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+ break;
+ }
+ }
}
/**
* Releases write lock.
*/
public void writeUnlock() {
-// long threadId = Thread.currentThread().getId();
-//
-// if (threadId != writeLockOwner)
-// throw new IllegalMonitorStateException();
-//
-// if (writeLockEntryCnt > 1) {
-// writeLockEntryCnt--;
-//
-// return;
-// }
-//
-// writeLockEntryCnt = 0;
-// writeLockOwner = -1;
-//
-// // Current thread holds write and read locks and is releasing
-// // write lock now.
-// int update = readLockEntryCnt.get() > 0 ? 1 : 0;
-//
-// boolean b = compareAndSet(STATE_OFFS, -1, update);
-//
-// assert b;
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId != writeLockOwner)
+ throw new IllegalMonitorStateException();
+
+ if (writeLockEntryCnt > 1) {
+ writeLockEntryCnt--;
+
+ return;
+ }
+
+ writeLockEntryCnt = 0;
+ writeLockOwner = -1;
+
+ // Current thread holds write and read locks and is releasing
+ // write lock now.
+ int update = readLockEntryCnt.get() > 0 ? 1 : 0;
+
+ boolean b = compareAndSet(STATE_OFFS, -1, update);
+
+ assert b;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index fb17cd7..f1aa4a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -424,7 +424,7 @@ public class GridNioServer<T> {
assert ses != null;
assert fut != null;
- boolean wakeup = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
+ int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
IgniteInClosure<IgniteException> ackClosure;
@@ -432,17 +432,17 @@ public class GridNioServer<T> {
fut.ackClosure(ackClosure);
if (ses.closed()) {
- fut.connectionClosed();
+ if (ses.removeFuture(fut))
+ fut.connectionClosed();
}
- else if (wakeup)
+ else if (msgCnt == 1)
// Change from 0 to 1 means that worker thread should be waken up.
clientWorkers.get(ses.selectorIndex()).offer(fut);
IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr;
if (lsnr0 != null)
- // TODO ignite-perftest pass correct queue size.
- lsnr0.apply(ses, 0);
+ lsnr0.apply(ses, msgCnt);
}
/**
@@ -1383,7 +1383,7 @@ public class GridNioServer<T> {
long now = U.currentTimeMillis();
- if (U.currentTimeMillis() - lastIdleCheck > 5000) {
+ if (now - lastIdleCheck > 5000) {
lastIdleCheck = now;
checkIdle(selector.keys());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a2b7565..6b1f6a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -21,14 +21,14 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
/**
* Session implementation bound to selector API and socket API.
@@ -37,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
*/
class GridSelectorNioSessionImpl extends GridNioSessionImpl {
/** Pending write requests. */
- private final ConcurrentLinkedQueue<GridNioFuture<?>> queue = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
/** Selection key associated with this session. */
@GridToStringExclude
@@ -47,7 +47,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
private final int selectorIdx;
/** Size counter. */
- private final AtomicBoolean wakeupSelector = new AtomicBoolean();
+ private final AtomicInteger queueSize = new AtomicInteger();
/** Semaphore. */
@GridToStringExclude
@@ -163,14 +163,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
* @param writeFut Write request.
* @return Updated size of the queue.
*/
- boolean offerSystemFuture(GridNioFuture<?> writeFut) {
+ int offerSystemFuture(GridNioFuture<?> writeFut) {
writeFut.messageThread(true);
- boolean res = queue.offer(writeFut);
+ boolean res = queue.offerFirst(writeFut);
assert res : "Future was not added to queue";
- return !wakeupSelector.get() && wakeupSelector.compareAndSet(false, true);
+ return queueSize.incrementAndGet();
}
/**
@@ -183,7 +183,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
* @param writeFut Write request to add.
* @return Updated size of the queue.
*/
- boolean offerFuture(GridNioFuture<?> writeFut) {
+ int offerFuture(GridNioFuture<?> writeFut) {
boolean msgThread = GridNioBackPressureControl.threadProcessingMessage();
if (sem != null && !msgThread)
@@ -195,7 +195,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
assert res : "Future was not added to queue";
- return !wakeupSelector.get() && wakeupSelector.compareAndSet(false, true);
+ return queueSize.incrementAndGet();
}
/**
@@ -208,7 +208,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
assert add;
- wakeupSelector.set(false);
+ boolean set = queueSize.compareAndSet(0, futs.size());
+
+ assert set;
}
/**
@@ -217,13 +219,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
@Nullable GridNioFuture<?> pollFuture() {
GridNioFuture<?> last = queue.poll();
- if (last == null) {
- wakeupSelector.set(false);
-
- last = queue.poll();
- }
-
if (last != null) {
+ queueSize.decrementAndGet();
+
if (sem != null && !last.messageThread())
sem.release();
@@ -248,12 +246,22 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
}
/**
+ * @param fut Future.
+ * @return {@code True} if future was removed from queue.
+ */
+ boolean removeFuture(GridNioFuture<?> fut) {
+ assert closed();
+
+ return queue.removeLastOccurrence(fut);
+ }
+
+ /**
* Gets number of write requests in a queue that have not been processed yet.
*
* @return Number of write requests.
*/
int writeQueueSize() {
- return queue.size();
+ return queueSize.get();
}
/** {@inheritDoc} */