You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/11/07 09:02:18 UTC

ignite git commit: IGNITE-6809: Use MPSC queue in striped pool. This closes #2960.

Repository: ignite
Updated Branches:
  refs/heads/master b7f2a8f4a -> 6f94659b5


IGNITE-6809: Use MPSC queue in striped pool. This closes #2960.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f94659b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f94659b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f94659b

Branch: refs/heads/master
Commit: 6f94659b53f056c176b3a3c6dd8f11a5db2ad74f
Parents: b7f2a8f
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Nov 7 12:02:12 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 7 12:02:12 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/util/MpscQueue.java  | 240 +++++++++++++++
 .../ignite/internal/util/StripedExecutor.java   | 304 +++++++++----------
 2 files changed, 377 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6f94659b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
new file mode 100644
index 0000000..cc10124
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * MP-SC concurrent linked queue implementation based on Dmitry Vyukov's <a href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue">
+ * Non-intrusive MPSC node-based queue</a>.
+ */
+@SuppressWarnings({"WeakerAccess", "PackageVisibleField", "unused"})
+public final class MpscQueue<E> extends Head<E> {
+    /** Padding. */
+    long p00, p01, p02, p03, p04, p05, p06, p07;
+    long p10, p11, p12, p13, p14, p15, p16, p17;
+
+    public MpscQueue() {
+        Node node = new Node(null);
+
+        tail = node;
+        head = node;
+    }
+}
+
+/**
+ * Head of {@link MpscQueue}.
+ */
+@SuppressWarnings({"WeakerAccess", "AtomicFieldUpdaterIssues", "ClassNameDiffersFromFileName"})
+abstract class Head<E> extends PaddingL1<E> {
+    /** Head field updater. */
+    private static final AtomicReferenceFieldUpdater<Head, Node> updater =
+        AtomicReferenceFieldUpdater.newUpdater(Head.class, Node.class, "head");
+
+    /** Head. */
+    protected volatile Node head;
+
+    /**
+     * Poll element.
+     *
+     * @return Element.
+     */
+    @SuppressWarnings("unchecked")
+    public E poll() {
+        Node node = peekNode();
+
+        if (node != null) {
+            Object val = node.value();
+
+            node.value(null);
+
+            updater.lazySet(this, node);
+
+            return (E)val;
+        }
+        else
+            return null;
+    }
+
+
+    /**
+     * @return queue size.
+     */
+    public int size() {
+        Node node = peekNode();
+
+        int size = 0;
+
+        for (;;) {
+            if (node == null || node.value() == null)
+                break;
+
+            Node next = node.next();
+
+            if (node == next)
+                break;
+
+            node = next;
+
+            if (++size == Integer.MAX_VALUE)
+                break;
+        }
+
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        Node node = peekNode();
+
+        StringBuilder sb = new StringBuilder().append('[');
+
+        for (;;) {
+            if (node == null)
+                break;
+
+            Object value = node.value();
+
+            if (value == null)
+                break;
+
+            if(sb.length() > 1)
+                sb.append(',').append(' ');
+
+            sb.append(value);
+
+            Node next = node.next();
+
+            if (node == next)
+                break;
+
+            node = next;
+        }
+
+        return sb.append(']').toString();
+    }
+
+    /**
+     * @return The node after the head of the queue (the first element in the queue).
+     */
+    private Node peekNode() {
+        Node head = this.head;
+        Node next = head.next();
+
+        if (next == null && head != tail) {
+            do {
+                next = head.next();
+            } while (next == null);
+        }
+        return next;
+    }
+}
+
+/**
+ * Padding.
+ */
+@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"})
+abstract class PaddingL1<E> extends Tail<E> {
+    /** Padding. */
+    long p00, p01, p02, p03, p04, p05, p06, p07;
+    long p10, p11, p12, p13, p14, p15, p16, p17;
+}
+
+/**
+ * Tail of {@link MpscQueue}.
+ */
+@SuppressWarnings({"ClassNameDiffersFromFileName", "AtomicFieldUpdaterIssues"})
+abstract class Tail<E> extends PaddingL0 {
+    /** Tail field updater. */
+    private static final AtomicReferenceFieldUpdater<Tail, Node> updater =
+        AtomicReferenceFieldUpdater.newUpdater(Tail.class, Node.class, "tail");
+
+    /** Tail. */
+    protected volatile Node tail;
+
+    /**
+     * Offer element.
+     *
+     * @param e Element.
+     */
+    public void offer(final E e) {
+        if (e == null)
+            throw new IllegalArgumentException("Null are not allowed.");
+
+        Node newTail = new Node(e);
+
+        Node prevTail = updater.getAndSet(this, newTail);
+
+        prevTail.next(newTail);
+    }
+}
+
+/**
+ * Padding.
+ */
+@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"})
+abstract class PaddingL0 {
+    /** Padding. */
+    long p00, p01, p02, p03, p04, p05, p06, p07;
+    long p10, p11, p12, p13, p14, p15, p16, p17;
+}
+
+@SuppressWarnings({"UnusedDeclaration", "ClassNameDiffersFromFileName"})
+final class Node {
+    /** Next field updater. */
+    private static final AtomicReferenceFieldUpdater<Node, Node> updater =
+        AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
+
+    /** Value. */
+    private Object val;
+
+    /** Next node. */
+    private volatile Node next;
+
+    /**
+     * Constructor.
+     *
+     * @param val Value.
+     */
+    Node(Object val) {
+        this.val = val;
+    }
+
+    /**
+     * Set next node.
+     *
+     * @param next Next node.
+     */
+    void next(Node next) {
+        updater.lazySet(this, next);
+    }
+
+    /** Value. */
+    Object value() {
+        return val;
+    }
+
+    void value(Object val) {
+        this.val = val;
+    }
+
+    /** Next node. */
+    Node next() {
+        return next;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f94659b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 630d34c..1f9b08d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -22,34 +22,36 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
+import java.util.Random;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 /**
  * Striped executor.
  */
 public class StripedExecutor implements ExecutorService {
+    /** */
+    private static final int SPIN_CNT = 2048;
+
     /** Stripes. */
     private final Stripe[] stripes;
 
@@ -76,12 +78,11 @@ public class StripedExecutor implements ExecutorService {
      * @param log Logger.
      * @param stealTasks {@code True} to steal tasks.
      */
-    public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) {
+    public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log,
+        boolean stealTasks) {
         A.ensure(cnt > 0, "cnt > 0");
 
-        boolean success = false;
-
-        stripes = new Stripe[cnt];
+        stripes = stealTasks ? StealingStripe.create(igniteInstanceName, poolName, log, cnt) : new Stripe[cnt];
 
         completedCntrs = new long[cnt];
 
@@ -89,17 +90,12 @@ public class StripedExecutor implements ExecutorService {
 
         this.log = log;
 
+        boolean success = false;
+
         try {
-            for (int i = 0; i < cnt; i++) {
-                stripes[i] = stealTasks ? new StripeConcurrentQueue(
-                    igniteInstanceName,
-                    poolName,
-                    i,
-                    log, stripes) : new StripeConcurrentQueue(
-                        igniteInstanceName,
-                        poolName,
-                        i,
-                        log);
+            if(!stealTasks) {
+                for (int i = 0; i < cnt; i++)
+                    stripes[i] = new MpscQueueBasedStripe(igniteInstanceName, poolName, i, log);
             }
 
             for (int i = 0; i < cnt; i++)
@@ -409,7 +405,7 @@ public class StripedExecutor implements ExecutorService {
     /**
      * Stripe.
      */
-    private static abstract class Stripe implements Runnable {
+    private abstract static class Stripe implements Runnable {
         /** */
         private final String igniteInstanceName;
 
@@ -440,7 +436,7 @@ public class StripedExecutor implements ExecutorService {
          * @param idx Stripe index.
          * @param log Logger.
          */
-        public Stripe(
+        Stripe(
             String igniteInstanceName,
             String poolName,
             int idx,
@@ -493,10 +489,8 @@ public class StripedExecutor implements ExecutorService {
         /** {@inheritDoc} */
         @Override public void run() {
             while (!stopping) {
-                Runnable cmd;
-
                 try {
-                    cmd = take();
+                    Runnable cmd = take();
 
                     if (cmd != null) {
                         active = true;
@@ -552,52 +546,79 @@ public class StripedExecutor implements ExecutorService {
         }
     }
 
-    /**
-     * Stripe.
-     */
-    private static class StripeConcurrentQueue extends Stripe {
+    /** */
+    private static final class StealingStripe extends Stripe {
         /** */
         private static final int IGNITE_TASKS_STEALING_THRESHOLD =
             IgniteSystemProperties.getInteger(
                 IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4);
 
-        /** Queue. */
-        private final Queue<Runnable> queue;
+        /** */
+        @GridToStringExclude
+        private final Deque<Runnable>[] queues;
+
+        /** */
+        @GridToStringExclude
+        private final IgniteRunnable unpark;
 
         /** */
         @GridToStringExclude
-        private final Stripe[] others;
+        private Random rnd;
 
         /** */
-        private volatile boolean parked;
+        private final Deque<Runnable> queue;
 
-        /**
-         * @param igniteInstanceName Ignite instance name.
-         * @param poolName Pool name.
-         * @param idx Stripe index.
-         * @param log Logger.
-         */
-        StripeConcurrentQueue(
-            String igniteInstanceName,
-            String poolName,
-            int idx,
-            IgniteLogger log
-        ) {
-            this(igniteInstanceName, poolName, idx, log, null);
+        /** */
+        private final AtomicBoolean parked = new AtomicBoolean();
+
+        /** */
+        @SuppressWarnings("unchecked")
+        static Stripe[] create(String igniteInstanceName, String poolName, IgniteLogger log, final int poolSize) {
+            final StealingStripe[] stripes = new StealingStripe[poolSize];
+            Deque<Runnable>[] queues = new Deque[poolSize];
+
+            IgniteRunnable unpark = new IgniteRunnable() {
+                @Override public void run() {
+                    int init = ThreadLocalRandom.current().nextInt(poolSize);
+
+                    for (int cur = init;;) {
+                        AtomicBoolean parked = stripes[cur].parked;
+
+                        if (parked.get() && parked.compareAndSet(true, false)) {
+                            LockSupport.unpark(stripes[cur].thread);
+
+                            break;
+                        }
+
+                        if ((cur = (cur + 1) % poolSize) == init)
+                            break;
+                    }
+                }
+            };
+
+            for (int i = 0; i < poolSize; i++) {
+                queues[i] = new ConcurrentLinkedDeque8<>();
+                stripes[i] = new StealingStripe(i, igniteInstanceName, poolName, log, queues, unpark);
+            }
+
+            return stripes;
         }
 
         /**
+         * @param idx Stripe index.
          * @param igniteInstanceName Ignite instance name.
          * @param poolName Pool name.
-         * @param idx Stripe index.
          * @param log Logger.
+         * @param queues Other queues to steal tasks from.
+         * @param unpark Unpark callback, unparks random parked stripe from the pool.
          */
-        StripeConcurrentQueue(
+        private StealingStripe(
+            int idx,
             String igniteInstanceName,
             String poolName,
-            int idx,
             IgniteLogger log,
-            Stripe[] others
+            Deque<Runnable>[] queues,
+            IgniteRunnable unpark
         ) {
             super(
                 igniteInstanceName,
@@ -605,73 +626,62 @@ public class StripedExecutor implements ExecutorService {
                 idx,
                 log);
 
-            this.others = others;
+            this.queues = queues;
+            this.unpark = unpark;
 
-            this.queue = others == null ? new ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>();
+            queue = queues[idx];
         }
 
         /** {@inheritDoc} */
         @Override Runnable take() throws InterruptedException {
-            Runnable r;
-
-            for (int i = 0; i < 2048; i++) {
-                r = queue.poll();
+            Runnable task;
 
-                if (r != null)
-                    return r;
+            for (int i = 0; i < SPIN_CNT; i++) {
+                if ((task = queue.poll()) != null)
+                    return task;
             }
 
-            parked = true;
+            for (;;) {
+                parked.set(true);
 
-            try {
-                for (;;) {
-                    r = queue.poll();
+                if ((task = queue.poll()) != null) {
+                    parked.set(false);
 
-                    if (r != null)
-                        return r;
+                    return task;
+                }
 
-                    if(others != null) {
-                        int len = others.length;
-                        int init = ThreadLocalRandom.current().nextInt(len);
-                        int cur = init;
+                int len = queues.length, init = random().nextInt(len);
 
-                        while (true) {
-                            if(cur != idx) {
-                                Deque<Runnable> queue = (Deque<Runnable>) ((StripeConcurrentQueue) others[cur]).queue;
+                for (int cur = init;;) {
+                    if(cur != idx) {
+                        Deque<Runnable> queue = queues[cur];
 
-                                if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null)
-                                    return r;
-                            }
+                        if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (task = queue.pollLast()) != null) {
+                            parked.set(false);
 
-                            if ((cur = (cur + 1) % len) == init)
-                                break;
+                            return task;
                         }
                     }
 
-                    LockSupport.park();
-
-                    if (Thread.interrupted())
-                        throw new InterruptedException();
+                    if ((cur = (cur + 1) % len) == init)
+                        break;
                 }
-            }
-            finally {
-                parked = false;
+
+                LockSupport.park();
+
+                if (Thread.interrupted())
+                    throw new InterruptedException();
             }
         }
 
         /** {@inheritDoc} */
-        void execute(Runnable cmd) {
+        @Override void execute(Runnable cmd) {
             queue.add(cmd);
 
-            if (parked)
+            if (parked.get() && parked.compareAndSet(true, false))
                 LockSupport.unpark(thread);
-
-            if(others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) {
-                for (Stripe other : others) {
-                    if(((StripeConcurrentQueue)other).parked)
-                        LockSupport.unpark(other.thread);
-                }
-            }
+            else if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD)
+                unpark.run();
         }
 
         /** {@inheritDoc} */
@@ -684,18 +694,24 @@ public class StripedExecutor implements ExecutorService {
             return queue.size();
         }
 
+        /** */
+        private Random random() {
+            return rnd == null ? rnd = ThreadLocalRandom.current() : rnd;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(StripeConcurrentQueue.class, this, super.toString());
+            return S.toString(StealingStripe.class, this, super.toString());
         }
     }
 
-    /**
-     * Stripe.
-     */
-    private static class StripeConcurrentQueueNoPark extends Stripe {
-        /** Queue. */
-        private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+    /** */
+    private static final class MpscQueueBasedStripe extends Stripe {
+        /** */
+        private final AtomicBoolean parked = new AtomicBoolean();
+
+        /** */
+        private final MpscQueue<Runnable> queue = new MpscQueue<>();
 
         /**
          * @param igniteInstanceName Ignite instance name.
@@ -703,82 +719,41 @@ public class StripedExecutor implements ExecutorService {
          * @param idx Stripe index.
          * @param log Logger.
          */
-        public StripeConcurrentQueueNoPark(
-            String igniteInstanceName,
-            String poolName,
-            int idx,
-            IgniteLogger log
-        ) {
-            super(igniteInstanceName,
-                poolName,
-                idx,
-                log);
+        private MpscQueueBasedStripe(String igniteInstanceName, String poolName, int idx, IgniteLogger log) {
+            super(igniteInstanceName, poolName, idx, log);
         }
 
         /** {@inheritDoc} */
-        @Override Runnable take() {
-            for (;;) {
-                Runnable r = queue.poll();
-
-                if (r != null)
-                    return r;
-            }
-        }
+        @Override void execute(Runnable cmd) {
+            queue.offer(cmd);
 
-        /** {@inheritDoc} */
-        void execute(Runnable cmd) {
-            queue.add(cmd);
+            if (parked.get() && parked.compareAndSet(true, false))
+                LockSupport.unpark(thread);
         }
 
         /** {@inheritDoc} */
-        @Override int queueSize() {
-            return queue.size();
-        }
+        @Override Runnable take() throws InterruptedException {
+            Runnable task;
 
-        /** {@inheritDoc} */
-        @Override String queueToString() {
-            return String.valueOf(queue);
-        }
+            for (int i = 0; i < SPIN_CNT; i++) {
+                if ((task = queue.poll()) != null)
+                    return task;
+            }
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString());
-        }
-    }
+            for (;;) {
+                parked.set(true);
 
-    /**
-     * Stripe.
-     */
-    private static class StripeConcurrentBlockingQueue extends Stripe {
-        /** Queue. */
-        private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+                if ((task = queue.poll()) != null) {
+                    parked.set(false);
 
-        /**
-         * @param igniteInstanceName Ignite instance name.
-         * @param poolName Pool name.
-         * @param idx Stripe index.
-         * @param log Logger.
-         */
-        public StripeConcurrentBlockingQueue(
-            String igniteInstanceName,
-            String poolName,
-            int idx,
-            IgniteLogger log
-        ) {
-            super(igniteInstanceName,
-                poolName,
-                idx,
-                log);
-        }
+                    return task;
+                }
 
-        /** {@inheritDoc} */
-        @Override Runnable take() throws InterruptedException {
-            return queue.take();
-        }
+                LockSupport.park();
 
-        /** {@inheritDoc} */
-        void execute(Runnable cmd) {
-            queue.add(cmd);
+                if (Thread.interrupted())
+                    throw new InterruptedException();
+            }
         }
 
         /** {@inheritDoc} */
@@ -790,10 +765,5 @@ public class StripedExecutor implements ExecutorService {
         @Override String queueToString() {
             return String.valueOf(queue);
         }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
-        }
     }
 }