You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/09 10:27:43 UTC
[28/50] [abbrv] ignite git commit: IGNITE-2333: Striped lock for
GridDhtPartitionTopologyImpl. This closes #462.
IGNITE-2333: Striped lock for GridDhtPartitionTopologyImpl. This closes #462.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12ab3818
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12ab3818
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12ab3818
Branch: refs/heads/ignite-1786
Commit: 12ab3818b4c355b8650fb4a0951914fbd3989ea3
Parents: 9200ec8
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Wed Mar 2 18:34:05 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 2 18:34:05 2016 +0300
----------------------------------------------------------------------
.../dht/GridDhtPartitionTopologyImpl.java | 12 +-
.../util/StripedCompositeReadWriteLock.java | 164 +++++++++++++++++++
2 files changed, 166 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/12ab3818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index bf2d2c6..b3786cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
@@ -44,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -101,7 +101,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
/** Lock. */
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
/** Partition update counter. */
private Map<Integer, Long> cntrMap = new HashMap<>();
@@ -1093,8 +1093,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return Checks if any of the local partitions need to be evicted.
*/
private boolean checkEvictions(long updateSeq) {
- assert lock.isWriteLockedByCurrentThread();
-
boolean changed = false;
UUID locId = cctx.nodeId();
@@ -1169,7 +1167,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
- assert lock.isWriteLockedByCurrentThread();
assert nodeId.equals(cctx.nodeId());
// In case if node joins, get topology at the time of joining node.
@@ -1223,7 +1220,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
*/
private void removeNode(UUID nodeId) {
assert nodeId != null;
- assert lock.writeLock().isHeldByCurrentThread();
ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
@@ -1288,8 +1284,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) {
- assert updateSeq || lock.isWriteLockedByCurrentThread();
-
lock.writeLock().lock();
try {
@@ -1425,8 +1419,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
*/
private void consistencyCheck() {
if (CONSISTENCY_CHECK) {
- assert lock.writeLock().isHeldByCurrentThread();
-
if (node2part == null)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/12ab3818/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
new file mode 100644
index 0000000..e28a5f8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * ReadWriteLock with striping mechanics.
+ * Compared to {@link ReentrantReadWriteLock} it has slightly improved performance of {@link ReadWriteLock#readLock()}
+ * operations at the cost of {@link ReadWriteLock#writeLock()} operations and memory consumption.
+ * It also supports reentrancy semantics like {@link ReentrantReadWriteLock}.
+ */
+public class StripedCompositeReadWriteLock implements ReadWriteLock {
+ /** Index generator. */
+ private static final AtomicInteger IDX_GEN = new AtomicInteger();
+
+ /** Index. */
+ private static final ThreadLocal<Integer> IDX = new ThreadLocal<Integer>() {
+ @Override protected Integer initialValue() {
+ return IDX_GEN.incrementAndGet();
+ }
+ };
+
+ /** Locks. */
+ private final ReentrantReadWriteLock[] locks;
+
+ /** Composite write lock. */
+ private final WriteLock writeLock;
+
+ /**
+ * Creates a new instance with given concurrency level.
+ *
+ * @param concurrencyLvl Number of internal read locks.
+ */
+ public StripedCompositeReadWriteLock(int concurrencyLvl) {
+ locks = new ReadLock[concurrencyLvl];
+
+ for (int i = 0; i < concurrencyLvl; i++)
+ locks[i] = new ReadLock();
+
+ writeLock = new WriteLock();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Lock readLock() {
+ int idx = IDX.get() % locks.length;
+
+ return locks[idx].readLock();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Lock writeLock() {
+ return writeLock;
+ }
+
+ /**
+ * Read lock.
+ */
+ @SuppressWarnings("unused")
+ private static class ReadLock extends ReentrantReadWriteLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Padding. */
+ private long p0, p1, p2, p3, p4, p5, p6, p7;
+ }
+
+ /**
+ * Write lock.
+ */
+ private class WriteLock implements Lock {
+ /** {@inheritDoc} */
+ @Override public void lock() {
+ try {
+ lock0(false);
+ }
+ catch (InterruptedException ignore) {
+ assert false : "Should never happen";
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void lockInterruptibly() throws InterruptedException {
+ lock0(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unlock() {
+ unlock0(locks.length - 1);
+ }
+
+ /**
+ * Internal lock routine.
+ *
+ * @param canInterrupt Whether to acquire the lock interruptibly.
+ * @throws InterruptedException
+ */
+ private void lock0(boolean canInterrupt) throws InterruptedException {
+ int i = 0;
+
+ try {
+ for (; i < locks.length; i++) {
+ if (canInterrupt)
+ locks[i].writeLock().lockInterruptibly();
+ else
+ locks[i].writeLock().lock();
+ }
+ }
+ catch (InterruptedException e) {
+ unlock0(i - 1);
+
+ throw e;
+ }
+ }
+
+ /**
+ * Internal unlock routine.
+ *
+ * @param fromIdx Start index.
+ */
+ private void unlock0(int fromIdx) {
+ for (int i = fromIdx; i >= 0; i--)
+ locks[i].writeLock().unlock();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryLock() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Condition newCondition() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}