You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/11/07 09:02:18 UTC
ignite git commit: IGNITE-6809: Use MPSC queue in striped pool. This
closes #2960.
Repository: ignite
Updated Branches:
refs/heads/master b7f2a8f4a -> 6f94659b5
IGNITE-6809: Use MPSC queue in striped pool. This closes #2960.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f94659b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f94659b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f94659b
Branch: refs/heads/master
Commit: 6f94659b53f056c176b3a3c6dd8f11a5db2ad74f
Parents: b7f2a8f
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Nov 7 12:02:12 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 7 12:02:12 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/util/MpscQueue.java | 240 +++++++++++++++
.../ignite/internal/util/StripedExecutor.java | 304 +++++++++----------
2 files changed, 377 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f94659b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
new file mode 100644
index 0000000..cc10124
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * MP-SC concurrent linked queue implementation based on Dmitry Vyukov's <a href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue">
+ * Non-intrusive MPSC node-based queue</a>.
+ */
+@SuppressWarnings({"WeakerAccess", "PackageVisibleField", "unused"})
+public final class MpscQueue<E> extends Head<E> {
+ /** Padding. */
+ long p00, p01, p02, p03, p04, p05, p06, p07;
+ long p10, p11, p12, p13, p14, p15, p16, p17;
+
+ public MpscQueue() {
+ Node node = new Node(null);
+
+ tail = node;
+ head = node;
+ }
+}
+
+/**
+ * Head of {@link MpscQueue}.
+ */
+@SuppressWarnings({"WeakerAccess", "AtomicFieldUpdaterIssues", "ClassNameDiffersFromFileName"})
+abstract class Head<E> extends PaddingL1<E> {
+ /** Head field updater. */
+ private static final AtomicReferenceFieldUpdater<Head, Node> updater =
+ AtomicReferenceFieldUpdater.newUpdater(Head.class, Node.class, "head");
+
+ /** Head. */
+ protected volatile Node head;
+
+ /**
+ * Poll element.
+ *
+ * @return Element.
+ */
+ @SuppressWarnings("unchecked")
+ public E poll() {
+ Node node = peekNode();
+
+ if (node != null) {
+ Object val = node.value();
+
+ node.value(null);
+
+ updater.lazySet(this, node);
+
+ return (E)val;
+ }
+ else
+ return null;
+ }
+
+
+ /**
+ * @return queue size.
+ */
+ public int size() {
+ Node node = peekNode();
+
+ int size = 0;
+
+ for (;;) {
+ if (node == null || node.value() == null)
+ break;
+
+ Node next = node.next();
+
+ if (node == next)
+ break;
+
+ node = next;
+
+ if (++size == Integer.MAX_VALUE)
+ break;
+ }
+
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ Node node = peekNode();
+
+ StringBuilder sb = new StringBuilder().append('[');
+
+ for (;;) {
+ if (node == null)
+ break;
+
+ Object value = node.value();
+
+ if (value == null)
+ break;
+
+ if(sb.length() > 1)
+ sb.append(',').append(' ');
+
+ sb.append(value);
+
+ Node next = node.next();
+
+ if (node == next)
+ break;
+
+ node = next;
+ }
+
+ return sb.append(']').toString();
+ }
+
+ /**
+ * @return The node after the head of the queue (the first element in the queue).
+ */
+ private Node peekNode() {
+ Node head = this.head;
+ Node next = head.next();
+
+ if (next == null && head != tail) {
+ do {
+ next = head.next();
+ } while (next == null);
+ }
+ return next;
+ }
+}
+
+/**
+ * Padding.
+ */
+@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"})
+abstract class PaddingL1<E> extends Tail<E> {
+ /** Padding. */
+ long p00, p01, p02, p03, p04, p05, p06, p07;
+ long p10, p11, p12, p13, p14, p15, p16, p17;
+}
+
+/**
+ * Tail of {@link MpscQueue}.
+ */
+@SuppressWarnings({"ClassNameDiffersFromFileName", "AtomicFieldUpdaterIssues"})
+abstract class Tail<E> extends PaddingL0 {
+ /** Tail field updater. */
+ private static final AtomicReferenceFieldUpdater<Tail, Node> updater =
+ AtomicReferenceFieldUpdater.newUpdater(Tail.class, Node.class, "tail");
+
+ /** Tail. */
+ protected volatile Node tail;
+
+ /**
+ * Offer element.
+ *
+ * @param e Element.
+ */
+ public void offer(final E e) {
+ if (e == null)
+ throw new IllegalArgumentException("Null are not allowed.");
+
+ Node newTail = new Node(e);
+
+ Node prevTail = updater.getAndSet(this, newTail);
+
+ prevTail.next(newTail);
+ }
+}
+
+/**
+ * Padding.
+ */
+@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"})
+abstract class PaddingL0 {
+ /** Padding. */
+ long p00, p01, p02, p03, p04, p05, p06, p07;
+ long p10, p11, p12, p13, p14, p15, p16, p17;
+}
+
+@SuppressWarnings({"UnusedDeclaration", "ClassNameDiffersFromFileName"})
+final class Node {
+ /** Next field updater. */
+ private static final AtomicReferenceFieldUpdater<Node, Node> updater =
+ AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
+
+ /** Value. */
+ private Object val;
+
+ /** Next node. */
+ private volatile Node next;
+
+ /**
+ * Constructor.
+ *
+ * @param val Value.
+ */
+ Node(Object val) {
+ this.val = val;
+ }
+
+ /**
+ * Set next node.
+ *
+ * @param next Next node.
+ */
+ void next(Node next) {
+ updater.lazySet(this, next);
+ }
+
+ /** Value. */
+ Object value() {
+ return val;
+ }
+
+ void value(Object val) {
+ this.val = val;
+ }
+
+ /** Next node. */
+ Node next() {
+ return next;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f94659b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 630d34c..1f9b08d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -22,34 +22,36 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
+import java.util.Random;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
+import org.jsr166.ConcurrentLinkedDeque8;
/**
* Striped executor.
*/
public class StripedExecutor implements ExecutorService {
+ /** */
+ private static final int SPIN_CNT = 2048;
+
/** Stripes. */
private final Stripe[] stripes;
@@ -76,12 +78,11 @@ public class StripedExecutor implements ExecutorService {
* @param log Logger.
* @param stealTasks {@code True} to steal tasks.
*/
- public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) {
+ public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log,
+ boolean stealTasks) {
A.ensure(cnt > 0, "cnt > 0");
- boolean success = false;
-
- stripes = new Stripe[cnt];
+ stripes = stealTasks ? StealingStripe.create(igniteInstanceName, poolName, log, cnt) : new Stripe[cnt];
completedCntrs = new long[cnt];
@@ -89,17 +90,12 @@ public class StripedExecutor implements ExecutorService {
this.log = log;
+ boolean success = false;
+
try {
- for (int i = 0; i < cnt; i++) {
- stripes[i] = stealTasks ? new StripeConcurrentQueue(
- igniteInstanceName,
- poolName,
- i,
- log, stripes) : new StripeConcurrentQueue(
- igniteInstanceName,
- poolName,
- i,
- log);
+ if(!stealTasks) {
+ for (int i = 0; i < cnt; i++)
+ stripes[i] = new MpscQueueBasedStripe(igniteInstanceName, poolName, i, log);
}
for (int i = 0; i < cnt; i++)
@@ -409,7 +405,7 @@ public class StripedExecutor implements ExecutorService {
/**
* Stripe.
*/
- private static abstract class Stripe implements Runnable {
+ private abstract static class Stripe implements Runnable {
/** */
private final String igniteInstanceName;
@@ -440,7 +436,7 @@ public class StripedExecutor implements ExecutorService {
* @param idx Stripe index.
* @param log Logger.
*/
- public Stripe(
+ Stripe(
String igniteInstanceName,
String poolName,
int idx,
@@ -493,10 +489,8 @@ public class StripedExecutor implements ExecutorService {
/** {@inheritDoc} */
@Override public void run() {
while (!stopping) {
- Runnable cmd;
-
try {
- cmd = take();
+ Runnable cmd = take();
if (cmd != null) {
active = true;
@@ -552,52 +546,79 @@ public class StripedExecutor implements ExecutorService {
}
}
- /**
- * Stripe.
- */
- private static class StripeConcurrentQueue extends Stripe {
+ /** */
+ private static final class StealingStripe extends Stripe {
/** */
private static final int IGNITE_TASKS_STEALING_THRESHOLD =
IgniteSystemProperties.getInteger(
IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4);
- /** Queue. */
- private final Queue<Runnable> queue;
+ /** */
+ @GridToStringExclude
+ private final Deque<Runnable>[] queues;
+
+ /** */
+ @GridToStringExclude
+ private final IgniteRunnable unpark;
/** */
@GridToStringExclude
- private final Stripe[] others;
+ private Random rnd;
/** */
- private volatile boolean parked;
+ private final Deque<Runnable> queue;
- /**
- * @param igniteInstanceName Ignite instance name.
- * @param poolName Pool name.
- * @param idx Stripe index.
- * @param log Logger.
- */
- StripeConcurrentQueue(
- String igniteInstanceName,
- String poolName,
- int idx,
- IgniteLogger log
- ) {
- this(igniteInstanceName, poolName, idx, log, null);
+ /** */
+ private final AtomicBoolean parked = new AtomicBoolean();
+
+ /** */
+ @SuppressWarnings("unchecked")
+ static Stripe[] create(String igniteInstanceName, String poolName, IgniteLogger log, final int poolSize) {
+ final StealingStripe[] stripes = new StealingStripe[poolSize];
+ Deque<Runnable>[] queues = new Deque[poolSize];
+
+ IgniteRunnable unpark = new IgniteRunnable() {
+ @Override public void run() {
+ int init = ThreadLocalRandom.current().nextInt(poolSize);
+
+ for (int cur = init;;) {
+ AtomicBoolean parked = stripes[cur].parked;
+
+ if (parked.get() && parked.compareAndSet(true, false)) {
+ LockSupport.unpark(stripes[cur].thread);
+
+ break;
+ }
+
+ if ((cur = (cur + 1) % poolSize) == init)
+ break;
+ }
+ }
+ };
+
+ for (int i = 0; i < poolSize; i++) {
+ queues[i] = new ConcurrentLinkedDeque8<>();
+ stripes[i] = new StealingStripe(i, igniteInstanceName, poolName, log, queues, unpark);
+ }
+
+ return stripes;
}
/**
+ * @param idx Stripe index.
* @param igniteInstanceName Ignite instance name.
* @param poolName Pool name.
- * @param idx Stripe index.
* @param log Logger.
+ * @param queues Other queues to steal tasks from.
+ * @param unpark Unpark callback, unparks random parked stripe from the pool.
*/
- StripeConcurrentQueue(
+ private StealingStripe(
+ int idx,
String igniteInstanceName,
String poolName,
- int idx,
IgniteLogger log,
- Stripe[] others
+ Deque<Runnable>[] queues,
+ IgniteRunnable unpark
) {
super(
igniteInstanceName,
@@ -605,73 +626,62 @@ public class StripedExecutor implements ExecutorService {
idx,
log);
- this.others = others;
+ this.queues = queues;
+ this.unpark = unpark;
- this.queue = others == null ? new ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>();
+ queue = queues[idx];
}
/** {@inheritDoc} */
@Override Runnable take() throws InterruptedException {
- Runnable r;
-
- for (int i = 0; i < 2048; i++) {
- r = queue.poll();
+ Runnable task;
- if (r != null)
- return r;
+ for (int i = 0; i < SPIN_CNT; i++) {
+ if ((task = queue.poll()) != null)
+ return task;
}
- parked = true;
+ for (;;) {
+ parked.set(true);
- try {
- for (;;) {
- r = queue.poll();
+ if ((task = queue.poll()) != null) {
+ parked.set(false);
- if (r != null)
- return r;
+ return task;
+ }
- if(others != null) {
- int len = others.length;
- int init = ThreadLocalRandom.current().nextInt(len);
- int cur = init;
+ int len = queues.length, init = random().nextInt(len);
- while (true) {
- if(cur != idx) {
- Deque<Runnable> queue = (Deque<Runnable>) ((StripeConcurrentQueue) others[cur]).queue;
+ for (int cur = init;;) {
+ if(cur != idx) {
+ Deque<Runnable> queue = queues[cur];
- if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null)
- return r;
- }
+ if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (task = queue.pollLast()) != null) {
+ parked.set(false);
- if ((cur = (cur + 1) % len) == init)
- break;
+ return task;
}
}
- LockSupport.park();
-
- if (Thread.interrupted())
- throw new InterruptedException();
+ if ((cur = (cur + 1) % len) == init)
+ break;
}
- }
- finally {
- parked = false;
+
+ LockSupport.park();
+
+ if (Thread.interrupted())
+ throw new InterruptedException();
}
}
/** {@inheritDoc} */
- void execute(Runnable cmd) {
+ @Override void execute(Runnable cmd) {
queue.add(cmd);
- if (parked)
+ if (parked.get() && parked.compareAndSet(true, false))
LockSupport.unpark(thread);
-
- if(others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) {
- for (Stripe other : others) {
- if(((StripeConcurrentQueue)other).parked)
- LockSupport.unpark(other.thread);
- }
- }
+ else if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD)
+ unpark.run();
}
/** {@inheritDoc} */
@@ -684,18 +694,24 @@ public class StripedExecutor implements ExecutorService {
return queue.size();
}
+ /** */
+ private Random random() {
+ return rnd == null ? rnd = ThreadLocalRandom.current() : rnd;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(StripeConcurrentQueue.class, this, super.toString());
+ return S.toString(StealingStripe.class, this, super.toString());
}
}
- /**
- * Stripe.
- */
- private static class StripeConcurrentQueueNoPark extends Stripe {
- /** Queue. */
- private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+ /** */
+ private static final class MpscQueueBasedStripe extends Stripe {
+ /** */
+ private final AtomicBoolean parked = new AtomicBoolean();
+
+ /** */
+ private final MpscQueue<Runnable> queue = new MpscQueue<>();
/**
* @param igniteInstanceName Ignite instance name.
@@ -703,82 +719,41 @@ public class StripedExecutor implements ExecutorService {
* @param idx Stripe index.
* @param log Logger.
*/
- public StripeConcurrentQueueNoPark(
- String igniteInstanceName,
- String poolName,
- int idx,
- IgniteLogger log
- ) {
- super(igniteInstanceName,
- poolName,
- idx,
- log);
+ private MpscQueueBasedStripe(String igniteInstanceName, String poolName, int idx, IgniteLogger log) {
+ super(igniteInstanceName, poolName, idx, log);
}
/** {@inheritDoc} */
- @Override Runnable take() {
- for (;;) {
- Runnable r = queue.poll();
-
- if (r != null)
- return r;
- }
- }
+ @Override void execute(Runnable cmd) {
+ queue.offer(cmd);
- /** {@inheritDoc} */
- void execute(Runnable cmd) {
- queue.add(cmd);
+ if (parked.get() && parked.compareAndSet(true, false))
+ LockSupport.unpark(thread);
}
/** {@inheritDoc} */
- @Override int queueSize() {
- return queue.size();
- }
+ @Override Runnable take() throws InterruptedException {
+ Runnable task;
- /** {@inheritDoc} */
- @Override String queueToString() {
- return String.valueOf(queue);
- }
+ for (int i = 0; i < SPIN_CNT; i++) {
+ if ((task = queue.poll()) != null)
+ return task;
+ }
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString());
- }
- }
+ for (;;) {
+ parked.set(true);
- /**
- * Stripe.
- */
- private static class StripeConcurrentBlockingQueue extends Stripe {
- /** Queue. */
- private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ if ((task = queue.poll()) != null) {
+ parked.set(false);
- /**
- * @param igniteInstanceName Ignite instance name.
- * @param poolName Pool name.
- * @param idx Stripe index.
- * @param log Logger.
- */
- public StripeConcurrentBlockingQueue(
- String igniteInstanceName,
- String poolName,
- int idx,
- IgniteLogger log
- ) {
- super(igniteInstanceName,
- poolName,
- idx,
- log);
- }
+ return task;
+ }
- /** {@inheritDoc} */
- @Override Runnable take() throws InterruptedException {
- return queue.take();
- }
+ LockSupport.park();
- /** {@inheritDoc} */
- void execute(Runnable cmd) {
- queue.add(cmd);
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ }
}
/** {@inheritDoc} */
@@ -790,10 +765,5 @@ public class StripedExecutor implements ExecutorService {
@Override String queueToString() {
return String.valueOf(queue);
}
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
- }
}
}