You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/04 14:29:43 UTC

[3/3] ignite git commit: Spin-lock on hot cache IO manager path.

Spin-lock on hot cache IO manager path.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/78dd3ea4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/78dd3ea4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/78dd3ea4

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 78dd3ea427fad9cad334443b3f6dfbb6d4b07a51
Parents: 673daf4
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 16:29:26 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 16:29:26 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    | 85 ++++++--------------
 .../internal/util/GridStripedSpinBusyLock.java  | 74 ++++++++++++-----
 2 files changed, 80 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/78dd3ea4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b297827..0cd34be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -60,7 +59,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
@@ -102,11 +101,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
         new ConcurrentHashMap8<>();
 
-    /** Stopping flag. */
-    private boolean stopping;
-
     /** Mutex. */
-    private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+    private final GridStripedSpinBusyLock lock = new GridStripedSpinBusyLock();
 
     /** Deployment enabled. */
     private boolean depEnabled;
@@ -245,32 +241,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         for (Object ordTopic : orderedHandlers.keySet())
             cctx.gridIO().removeMessageListener(ordTopic);
 
-        boolean interrupted = false;
-
-        // Busy wait is intentional.
-        while (true) {
-            try {
-                if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
-                    break;
-                else
-                    Thread.sleep(200);
-            }
-            catch (InterruptedException ignore) {
-                // Preserve interrupt status & ignore.
-                // Note that interrupted flag is cleared.
-                interrupted = true;
-            }
-        }
-
-        if (interrupted)
-            Thread.currentThread().interrupt();
-
-        try {
-            stopping = true;
-        }
-        finally {
-            rw.writeUnlock();
-        }
+        lock.block();
     }
 
     /**
@@ -281,39 +252,35 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
         final IgniteBiInClosure<UUID, GridCacheMessage> c) {
-        rw.readLock();
+        if (lock.enterBusy()) {
+            try {
+                if (depEnabled)
+                    cctx.deploy().ignoreOwnership(true);
 
-        try {
-            if (stopping) {
-                if (log.isDebugEnabled())
-                    log.debug("Received cache communication message while stopping (will ignore) [nodeId=" +
-                        nodeId + ", msg=" + cacheMsg + ']');
+                unmarshall(nodeId, cacheMsg);
 
-                return;
+                if (cacheMsg.classError() != null)
+                    processFailedMessage(nodeId, cacheMsg, c);
+                else
+                    processMessage(nodeId, cacheMsg, c);
             }
+            catch (Throwable e) {
+                U.error(log, "Failed to process message [senderId=" + nodeId +
+                    ", messageType=" + cacheMsg.getClass() + ']', e);
 
-            if (depEnabled)
-                cctx.deploy().ignoreOwnership(true);
-
-            unmarshall(nodeId, cacheMsg);
-
-            if (cacheMsg.classError() != null)
-                processFailedMessage(nodeId, cacheMsg, c);
-            else
-                processMessage(nodeId, cacheMsg, c);
-        }
-        catch (Throwable e) {
-            U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e);
-
-            if (e instanceof Error)
-                throw (Error)e;
-        }
-        finally {
-            if (depEnabled)
-                cctx.deploy().ignoreOwnership(false);
+                if (e instanceof Error)
+                    throw (Error)e;
+            }
+            finally {
+                if (depEnabled)
+                    cctx.deploy().ignoreOwnership(false);
 
-            rw.readUnlock();
+                lock.leaveBusy();
+            }
         }
+        else if (log.isDebugEnabled())
+            log.debug("Received cache communication message while stopping (will ignore) [nodeId=" + nodeId +
+                ", msg=" + cacheMsg + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/78dd3ea4/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
index a11b0b1..a7b9da5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.util;
 
-import java.util.Random;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.thread.IgniteThread;
+
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 
 /**
  * Striped spin busy lock. Aimed to provide efficient "read" lock semantics while still maintaining safety when
@@ -31,15 +34,21 @@ public class GridStripedSpinBusyLock {
     /** Default amount of stripes. */
     private static final int DFLT_STRIPE_CNT = Runtime.getRuntime().availableProcessors() * 4;
 
+    /** Thread index generator. */
+    private static final AtomicInteger THREAD_IDX_GEN = new AtomicInteger();
+
     /** Thread index. */
-    private static ThreadLocal<Integer> THREAD_IDX = new ThreadLocal<Integer>() {
+    private static final ThreadLocal<Integer> THREAD_IDX = new ThreadLocal<Integer>() {
         @Override protected Integer initialValue() {
-            return new Random().nextInt(Integer.MAX_VALUE);
+            return THREAD_IDX_GEN.incrementAndGet();
         }
     };
 
-    /** States; they are not subjects to false-sharing because actual values are located far from each other. */
-    private final AtomicInteger[] states;
+    /** Amount of stripes. */
+    private final int stripeCnt;
+
+    /** States. */
+    private final AtomicIntegerArray states;
 
     /**
      * Default constructor.
@@ -54,10 +63,12 @@ public class GridStripedSpinBusyLock {
      * @param stripeCnt Amount of stripes.
      */
     public GridStripedSpinBusyLock(int stripeCnt) {
-        states = new AtomicInteger[stripeCnt];
+        A.ensure(stripeCnt > 0, "stripeCnt > 0");
+
+        this.stripeCnt = stripeCnt;
 
-        for (int i = 0; i < stripeCnt; i++)
-            states[i] = new AtomicInteger();
+        // Each state must be located 64 bytes from the other to avoid false sharing.
+        states = new AtomicIntegerArray(adjusted(stripeCnt));
     }
 
     /**
@@ -66,7 +77,7 @@ public class GridStripedSpinBusyLock {
      * @return {@code True} if entered busy state.
      */
     public boolean enterBusy() {
-        int val = state().incrementAndGet();
+        int val = states.incrementAndGet(index());
 
         if ((val & WRITER_MASK) == WRITER_MASK) {
             leaveBusy();
@@ -81,7 +92,7 @@ public class GridStripedSpinBusyLock {
      * Leave busy state.
      */
     public void leaveBusy() {
-        state().decrementAndGet();
+        states.decrementAndGet(index());
     }
 
     /**
@@ -89,11 +100,13 @@ public class GridStripedSpinBusyLock {
      */
     public void block() {
         // 1. CAS-loop to set a writer bit.
-        for (AtomicInteger state : states) {
+        for (int i = 0; i < stripeCnt; i++) {
+            int idx = adjusted(i);
+
             while (true) {
-                int oldVal = state.get();
+                int oldVal = states.get(idx);
 
-                if (state.compareAndSet(oldVal, oldVal | WRITER_MASK))
+                if (states.compareAndSet(idx, oldVal, oldVal | WRITER_MASK))
                     break;
             }
         }
@@ -101,8 +114,10 @@ public class GridStripedSpinBusyLock {
         // 2. Wait until all readers are out.
         boolean interrupt = false;
 
-        for (AtomicInteger state : states) {
-            while (state.get() != WRITER_MASK) {
+        for (int i = 0; i < stripeCnt; i++) {
+            int idx = adjusted(i);
+
+            while (states.get(idx) != WRITER_MASK) {
                 try {
                     Thread.sleep(10);
                 }
@@ -117,11 +132,30 @@ public class GridStripedSpinBusyLock {
     }
 
     /**
-     * Gets state of thread's stripe.
+     * Get index for the given thread.
+     *
+     * @return Index for the given thread.
+     */
+    private int index() {
+        Thread t = Thread.currentThread();
+
+        if (t instanceof IgniteThread) {
+            int idx = ((IgniteThread) t).groupIndex();
+
+            if (idx != IgniteThread.GRP_IDX_UNASSIGNED)
+                return idx;
+        }
+
+        return adjusted(THREAD_IDX.get() % stripeCnt);
+    }
+
+    /**
+     * Gets value adjusted for striping.
      *
-     * @return State.
+     * @param val Value.
+     * @return Value.
      */
-    private AtomicInteger state() {
-        return states[THREAD_IDX.get() % states.length];
+    private static int adjusted(int val) {
+        return val << 4;
     }
-}
+}
\ No newline at end of file