You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/04 14:29:41 UTC

[1/3] ignite git commit: Reverted invalid changes.

Repository: ignite
Updated Branches:
  refs/heads/ignite-atomic-good-lock-bench a86c4c7d6 -> 78dd3ea42


Reverted invalid changes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d6c6c9b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d6c6c9b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d6c6c9b

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 2d6c6c9bd4ce2379edd32e8e472d52c183995920
Parents: a86c4c7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 16:10:40 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 16:10:40 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/GridCacheIoManager.java   |  6 +++---
 .../distributed/dht/atomic/GridDhtAtomicCache.java      | 12 ++++++------
 2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2d6c6c9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 6cb3510..b297827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -281,7 +281,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
         final IgniteBiInClosure<UUID, GridCacheMessage> c) {
-        //rw.readLock();
+        rw.readLock();
 
         try {
             if (stopping) {
@@ -312,7 +312,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (depEnabled)
                 cctx.deploy().ignoreOwnership(false);
 
-            //rw.readUnlock();
+            rw.readUnlock();
         }
     }
 
@@ -619,7 +619,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             cctx.mvcc().contextReset();
 
             // Unwind eviction notifications.
-            //CU.unwindEvicts(cctx);
+            CU.unwindEvicts(cctx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d6c6c9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2ca54c3..e908c05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2611,12 +2611,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             // Optimization.
             return;
 
-//        // Must touch all entries since update may have deleted entries.
-//        // Eviction manager will remove empty entries.
-//        for (GridCacheMapEntry entry : locked) {
-//            if (entry != null && (skip == null || !skip.contains(entry.key())))
-//                ctx.evicts().touch(entry, topVer);
-//        }
+        // Must touch all entries since update may have deleted entries.
+        // Eviction manager will remove empty entries.
+        for (GridCacheMapEntry entry : locked) {
+            if (entry != null && (skip == null || !skip.contains(entry.key())))
+                ctx.evicts().touch(entry, topVer);
+        }
     }
 
     /**


[3/3] ignite git commit: Spin-lock on hot cache IO manager path.

Posted by vo...@apache.org.
Spin-lock on hot cache IO manager path.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/78dd3ea4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/78dd3ea4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/78dd3ea4

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 78dd3ea427fad9cad334443b3f6dfbb6d4b07a51
Parents: 673daf4
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 16:29:26 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 16:29:26 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    | 85 ++++++--------------
 .../internal/util/GridStripedSpinBusyLock.java  | 74 ++++++++++++-----
 2 files changed, 80 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/78dd3ea4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b297827..0cd34be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -60,7 +59,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
@@ -102,11 +101,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
         new ConcurrentHashMap8<>();
 
-    /** Stopping flag. */
-    private boolean stopping;
-
     /** Mutex. */
-    private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+    private final GridStripedSpinBusyLock lock = new GridStripedSpinBusyLock();
 
     /** Deployment enabled. */
     private boolean depEnabled;
@@ -245,32 +241,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         for (Object ordTopic : orderedHandlers.keySet())
             cctx.gridIO().removeMessageListener(ordTopic);
 
-        boolean interrupted = false;
-
-        // Busy wait is intentional.
-        while (true) {
-            try {
-                if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
-                    break;
-                else
-                    Thread.sleep(200);
-            }
-            catch (InterruptedException ignore) {
-                // Preserve interrupt status & ignore.
-                // Note that interrupted flag is cleared.
-                interrupted = true;
-            }
-        }
-
-        if (interrupted)
-            Thread.currentThread().interrupt();
-
-        try {
-            stopping = true;
-        }
-        finally {
-            rw.writeUnlock();
-        }
+        lock.block();
     }
 
     /**
@@ -281,39 +252,35 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
         final IgniteBiInClosure<UUID, GridCacheMessage> c) {
-        rw.readLock();
+        if (lock.enterBusy()) {
+            try {
+                if (depEnabled)
+                    cctx.deploy().ignoreOwnership(true);
 
-        try {
-            if (stopping) {
-                if (log.isDebugEnabled())
-                    log.debug("Received cache communication message while stopping (will ignore) [nodeId=" +
-                        nodeId + ", msg=" + cacheMsg + ']');
+                unmarshall(nodeId, cacheMsg);
 
-                return;
+                if (cacheMsg.classError() != null)
+                    processFailedMessage(nodeId, cacheMsg, c);
+                else
+                    processMessage(nodeId, cacheMsg, c);
             }
+            catch (Throwable e) {
+                U.error(log, "Failed to process message [senderId=" + nodeId +
+                    ", messageType=" + cacheMsg.getClass() + ']', e);
 
-            if (depEnabled)
-                cctx.deploy().ignoreOwnership(true);
-
-            unmarshall(nodeId, cacheMsg);
-
-            if (cacheMsg.classError() != null)
-                processFailedMessage(nodeId, cacheMsg, c);
-            else
-                processMessage(nodeId, cacheMsg, c);
-        }
-        catch (Throwable e) {
-            U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e);
-
-            if (e instanceof Error)
-                throw (Error)e;
-        }
-        finally {
-            if (depEnabled)
-                cctx.deploy().ignoreOwnership(false);
+                if (e instanceof Error)
+                    throw (Error)e;
+            }
+            finally {
+                if (depEnabled)
+                    cctx.deploy().ignoreOwnership(false);
 
-            rw.readUnlock();
+                lock.leaveBusy();
+            }
         }
+        else if (log.isDebugEnabled())
+            log.debug("Received cache communication message while stopping (will ignore) [nodeId=" + nodeId +
+                ", msg=" + cacheMsg + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/78dd3ea4/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
index a11b0b1..a7b9da5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.util;
 
-import java.util.Random;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.thread.IgniteThread;
+
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 
 /**
  * Striped spin busy lock. Aimed to provide efficient "read" lock semantics while still maintaining safety when
@@ -31,15 +34,21 @@ public class GridStripedSpinBusyLock {
     /** Default amount of stripes. */
     private static final int DFLT_STRIPE_CNT = Runtime.getRuntime().availableProcessors() * 4;
 
+    /** Thread index generator. */
+    private static final AtomicInteger THREAD_IDX_GEN = new AtomicInteger();
+
     /** Thread index. */
-    private static ThreadLocal<Integer> THREAD_IDX = new ThreadLocal<Integer>() {
+    private static final ThreadLocal<Integer> THREAD_IDX = new ThreadLocal<Integer>() {
         @Override protected Integer initialValue() {
-            return new Random().nextInt(Integer.MAX_VALUE);
+            return THREAD_IDX_GEN.incrementAndGet();
         }
     };
 
-    /** States; they are not subjects to false-sharing because actual values are located far from each other. */
-    private final AtomicInteger[] states;
+    /** Amount of stripes. */
+    private final int stripeCnt;
+
+    /** States. */
+    private final AtomicIntegerArray states;
 
     /**
      * Default constructor.
@@ -54,10 +63,12 @@ public class GridStripedSpinBusyLock {
      * @param stripeCnt Amount of stripes.
      */
     public GridStripedSpinBusyLock(int stripeCnt) {
-        states = new AtomicInteger[stripeCnt];
+        A.ensure(stripeCnt > 0, "stripeCnt > 0");
+
+        this.stripeCnt = stripeCnt;
 
-        for (int i = 0; i < stripeCnt; i++)
-            states[i] = new AtomicInteger();
+        // Each state must be located 64 bytes from the other to avoid false sharing.
+        states = new AtomicIntegerArray(adjusted(stripeCnt));
     }
 
     /**
@@ -66,7 +77,7 @@ public class GridStripedSpinBusyLock {
      * @return {@code True} if entered busy state.
      */
     public boolean enterBusy() {
-        int val = state().incrementAndGet();
+        int val = states.incrementAndGet(index());
 
         if ((val & WRITER_MASK) == WRITER_MASK) {
             leaveBusy();
@@ -81,7 +92,7 @@ public class GridStripedSpinBusyLock {
      * Leave busy state.
      */
     public void leaveBusy() {
-        state().decrementAndGet();
+        states.decrementAndGet(index());
     }
 
     /**
@@ -89,11 +100,13 @@ public class GridStripedSpinBusyLock {
      */
     public void block() {
         // 1. CAS-loop to set a writer bit.
-        for (AtomicInteger state : states) {
+        for (int i = 0; i < stripeCnt; i++) {
+            int idx = adjusted(i);
+
             while (true) {
-                int oldVal = state.get();
+                int oldVal = states.get(idx);
 
-                if (state.compareAndSet(oldVal, oldVal | WRITER_MASK))
+                if (states.compareAndSet(idx, oldVal, oldVal | WRITER_MASK))
                     break;
             }
         }
@@ -101,8 +114,10 @@ public class GridStripedSpinBusyLock {
         // 2. Wait until all readers are out.
         boolean interrupt = false;
 
-        for (AtomicInteger state : states) {
-            while (state.get() != WRITER_MASK) {
+        for (int i = 0; i < stripeCnt; i++) {
+            int idx = adjusted(i);
+
+            while (states.get(idx) != WRITER_MASK) {
                 try {
                     Thread.sleep(10);
                 }
@@ -117,11 +132,30 @@ public class GridStripedSpinBusyLock {
     }
 
     /**
-     * Gets state of thread's stripe.
+     * Get index for the given thread.
+     *
+     * @return Index for the given thread.
+     */
+    private int index() {
+        Thread t = Thread.currentThread();
+
+        if (t instanceof IgniteThread) {
+            int idx = ((IgniteThread) t).groupIndex();
+
+            if (idx != IgniteThread.GRP_IDX_UNASSIGNED)
+                return idx;
+        }
+
+        return adjusted(THREAD_IDX.get() % stripeCnt);
+    }
+
+    /**
+     * Gets value adjusted for striping.
      *
-     * @return State.
+     * @param val Value.
+     * @return Value.
      */
-    private AtomicInteger state() {
-        return states[THREAD_IDX.get() % states.length];
+    private static int adjusted(int val) {
+        return val << 4;
     }
-}
+}
\ No newline at end of file


[2/3] ignite git commit: Optimized back-pressure control.

Posted by vo...@apache.org.
Optimized back-pressure control.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/673daf48
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/673daf48
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/673daf48

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 673daf48b5a394c5a68dcf151cb13e6c09428928
Parents: 2d6c6c9
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 16:11:04 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 16:11:04 2016 +0300

----------------------------------------------------------------------
 .../util/nio/GridNioBackPressureControl.java       | 16 ++++++++++++++--
 .../org/apache/ignite/thread/IgniteThread.java     | 17 +++++++++++++++++
 2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/673daf48/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
index 96a1ab3..4d69533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.thread.IgniteThread;
+
 /**
  * Utility class that allows to ignore back-pressure control for threads that are processing messages.
  */
@@ -32,13 +34,23 @@ public class GridNioBackPressureControl {
      * @return Flag indicating whether current thread is processing message.
      */
     public static boolean threadProcessingMessage() {
-        return threadProcMsg.get();
+        Thread t = Thread.currentThread();
+
+        if (t instanceof IgniteThread)
+            return ((IgniteThread)t).processingMessage();
+        else
+            return threadProcMsg.get();
     }
 
     /**
      * @param processing Flag indicating whether current thread is processing message.
      */
     public static void threadProcessingMessage(boolean processing) {
-        threadProcMsg.set(processing);
+        Thread t = Thread.currentThread();
+
+        if (t instanceof IgniteThread)
+            ((IgniteThread)t).processingMessage(processing);
+        else
+            threadProcMsg.set(processing);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/673daf48/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 03ed589..c7a3790 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -48,6 +48,9 @@ public class IgniteThread extends Thread {
     /** Group index. */
     private final int grpIdx;
 
+    /** Message processing flag. */
+    private boolean procMsg;
+
     /**
      * Creates thread with given worker.
      *
@@ -126,6 +129,20 @@ public class IgniteThread extends Thread {
     }
 
     /**
+     * @return Message processing flag.
+     */
+    public boolean processingMessage() {
+        return procMsg;
+    }
+
+    /**
+     * @param procMsg Message processing flag.
+     */
+    public void processingMessage(boolean procMsg) {
+        this.procMsg = procMsg;
+    }
+
+    /**
      * Creates new thread name.
      *
      * @param num Thread number.