You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/03 07:26:21 UTC

[2/6] ignite git commit: IGNITE-2333: Striped lock for GridDhtPartitionTopologyImpl. This closes #462.

IGNITE-2333: Striped lock for GridDhtPartitionTopologyImpl. This closes #462.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12ab3818
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12ab3818
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12ab3818

Branch: refs/heads/ignite-2745
Commit: 12ab3818b4c355b8650fb4a0951914fbd3989ea3
Parents: 9200ec8
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Wed Mar 2 18:34:05 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 2 18:34:05 2016 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       |  12 +-
 .../util/StripedCompositeReadWriteLock.java     | 164 +++++++++++++++++++
 2 files changed, 166 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/12ab3818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index bf2d2c6..b3786cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -44,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -101,7 +101,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private final GridAtomicLong updateSeq = new GridAtomicLong(1);
 
     /** Lock. */
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
 
     /** Partition update counter. */
     private Map<Integer, Long> cntrMap = new HashMap<>();
@@ -1093,8 +1093,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return Checks if any of the local partitions need to be evicted.
      */
     private boolean checkEvictions(long updateSeq) {
-        assert lock.isWriteLockedByCurrentThread();
-
         boolean changed = false;
 
         UUID locId = cctx.nodeId();
@@ -1169,7 +1167,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
-        assert lock.isWriteLockedByCurrentThread();
         assert nodeId.equals(cctx.nodeId());
 
         // In case if node joins, get topology at the time of joining node.
@@ -1223,7 +1220,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      */
     private void removeNode(UUID nodeId) {
         assert nodeId != null;
-        assert lock.writeLock().isHeldByCurrentThread();
 
         ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
 
@@ -1288,8 +1284,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) {
-        assert updateSeq || lock.isWriteLockedByCurrentThread();
-
         lock.writeLock().lock();
 
         try {
@@ -1425,8 +1419,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      */
     private void consistencyCheck() {
         if (CONSISTENCY_CHECK) {
-            assert lock.writeLock().isHeldByCurrentThread();
-
             if (node2part == null)
                 return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/12ab3818/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
new file mode 100644
index 0000000..e28a5f8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * ReadWriteLock with striping mechanics.
+ * Compared to {@link ReentrantReadWriteLock} it has slightly improved performance of {@link ReadWriteLock#readLock()}
+ * operations at the cost of {@link ReadWriteLock#writeLock()} operations and memory consumption.
+ * It also supports reentrancy semantics like {@link ReentrantReadWriteLock}.
+ */
+public class StripedCompositeReadWriteLock implements ReadWriteLock {
+    /** Index generator. */
+    private static final AtomicInteger IDX_GEN = new AtomicInteger();
+
+    /** Index. */
+    private static final ThreadLocal<Integer> IDX = new ThreadLocal<Integer>() {
+        @Override protected Integer initialValue() {
+            return IDX_GEN.incrementAndGet();
+        }
+    };
+
+    /** Locks. */
+    private final ReentrantReadWriteLock[] locks;
+
+    /** Composite write lock. */
+    private final WriteLock writeLock;
+
+    /**
+     * Creates a new instance with given concurrency level.
+     *
+     * @param concurrencyLvl Number of internal read locks.
+     */
+    public StripedCompositeReadWriteLock(int concurrencyLvl) {
+        locks = new ReadLock[concurrencyLvl];
+
+        for (int i = 0; i < concurrencyLvl; i++)
+            locks[i] = new ReadLock();
+
+        writeLock = new WriteLock();
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public Lock readLock() {
+        int idx = IDX.get() % locks.length;
+
+        return locks[idx].readLock();
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public Lock writeLock() {
+        return writeLock;
+    }
+
+    /**
+     * Read lock.
+     */
+    @SuppressWarnings("unused")
+    private static class ReadLock extends ReentrantReadWriteLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Padding. */
+        private long p0, p1, p2, p3, p4, p5, p6, p7;
+    }
+
+    /**
+     * Write lock.
+     */
+    private class WriteLock implements Lock {
+        /** {@inheritDoc} */
+        @Override public void lock() {
+            try {
+                lock0(false);
+            }
+            catch (InterruptedException ignore) {
+                assert false : "Should never happen";
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void lockInterruptibly() throws InterruptedException {
+            lock0(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void unlock() {
+            unlock0(locks.length - 1);
+        }
+
+        /**
+         * Internal lock routine.
+         *
+         * @param canInterrupt Whether to acquire the lock interruptibly.
+         * @throws InterruptedException
+         */
+        private void lock0(boolean canInterrupt) throws InterruptedException {
+            int i = 0;
+
+            try {
+                for (; i < locks.length; i++) {
+                    if (canInterrupt)
+                        locks[i].writeLock().lockInterruptibly();
+                    else
+                        locks[i].writeLock().lock();
+                }
+            }
+            catch (InterruptedException e) {
+                unlock0(i - 1);
+
+                throw e;
+            }
+        }
+
+        /**
+         * Internal unlock routine.
+         *
+         * @param fromIdx Start index.
+         */
+        private void unlock0(int fromIdx) {
+            for (int i = fromIdx; i >= 0; i--)
+                locks[i].writeLock().unlock();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean tryLock() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Condition newCondition() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}