You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/23 14:53:45 UTC
ignite git commit: new queue in striped pool
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-balance-master 6951b20f4 -> abb95fac5
new queue in striped pool
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/abb95fac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/abb95fac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/abb95fac
Branch: refs/heads/ignite-comm-balance-master
Commit: abb95fac5c15c46416e58331c4f249ebe10d981b
Parents: 6951b20
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Jan 23 17:52:30 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Jan 23 17:52:30 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/client/util/MpscQueue.java | 289 +++++++++++++++++++
.../ignite/internal/util/StripedExecutor.java | 42 ++-
2 files changed, 330 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/abb95fac/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
new file mode 100644
index 0000000..8821f66
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.util;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+import static java.util.concurrent.locks.LockSupport.park;
+import static java.util.concurrent.locks.LockSupport.unpark;
+
+/**
+ * Multi producer single consumer queue.
+ */
+public class MpscQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
+ static final int INITIAL_ARRAY_SIZE = 512;
+ static final Node BLOCKED = new Node();
+
+ final AtomicReference<Node> putStack = new AtomicReference<Node>();
+ private final AtomicInteger takeStackSize = new AtomicInteger();
+
+ private Thread consumerThread;
+ private Object[] takeStack = new Object[INITIAL_ARRAY_SIZE];
+ private int takeStackIndex = -1;
+
+ static int nextPowerOfTwo(final int value) {
+ return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+ }
+
+ public void setConsumerThread(Thread consumerThread) {
+ this.consumerThread = consumerThread;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * This call is threadsafe; but it will only remove the items that are on the put-stack.
+ */
+ @Override
+ public void clear() {
+ putStack.set(BLOCKED);
+ }
+
+ @Override
+ public boolean offer(E item) {
+ A.notNull(item, "item can't be null");
+
+ AtomicReference<Node> putStack = this.putStack;
+ Node newHead = new Node();
+ newHead.item = item;
+
+ for (; ; ) {
+ Node oldHead = putStack.get();
+ if (oldHead == null || oldHead == BLOCKED) {
+ newHead.next = null;
+ newHead.size = 1;
+ } else {
+ newHead.next = oldHead;
+ newHead.size = oldHead.size + 1;
+ }
+
+ if (!putStack.compareAndSet(oldHead, newHead)) {
+ continue;
+ }
+
+ if (oldHead == BLOCKED) {
+ unpark(consumerThread);
+ }
+
+ return true;
+ }
+ }
+
+ @Override
+ public E peek() {
+ E item = peekNext();
+ if (item != null) {
+ return item;
+ }
+ if (!drainPutStack()) {
+ return null;
+ }
+ return peekNext();
+ }
+
+ @Override
+ public E take() throws InterruptedException {
+ E item = next();
+ if (item != null) {
+ return item;
+ }
+
+ takeAll();
+ assert takeStackIndex == 0;
+ assert takeStack[takeStackIndex] != null;
+
+ return next();
+ }
+
+ @Override
+ public E poll() {
+ E item = next();
+
+ if (item != null) {
+ return item;
+ }
+
+ if (!drainPutStack()) {
+ return null;
+ }
+
+ return next();
+ }
+
+ private E next() {
+ E item = peekNext();
+ if (item != null) {
+ dequeue();
+ }
+ return item;
+ }
+
+ private E peekNext() {
+ if (takeStackIndex == -1) {
+ return null;
+ }
+
+ if (takeStackIndex == takeStack.length) {
+ takeStackIndex = -1;
+ return null;
+ }
+
+ E item = (E) takeStack[takeStackIndex];
+ if (item == null) {
+ takeStackIndex = -1;
+ return null;
+ }
+ return item;
+ }
+
+ private void dequeue() {
+ takeStack[takeStackIndex] = null;
+ takeStackIndex++;
+ takeStackSize.lazySet(takeStackSize.get() - 1);
+ }
+
+ private void takeAll() throws InterruptedException {
+ AtomicReference<Node> putStack = this.putStack;
+ for (; ; ) {
+ if (consumerThread.isInterrupted()) {
+ putStack.compareAndSet(BLOCKED, null);
+ throw new InterruptedException();
+ }
+
+ Node currentPutStackHead = putStack.get();
+
+ if (currentPutStackHead == null) {
+ // there is nothing to be take, so lets block.
+ if (!putStack.compareAndSet(null, BLOCKED)) {
+ // we are lucky, something is available
+ continue;
+ }
+
+ // lets block for real.
+ park();
+ } else if (currentPutStackHead == BLOCKED) {
+ park();
+ } else {
+ if (!putStack.compareAndSet(currentPutStackHead, null)) {
+ continue;
+ }
+
+ copyIntoTakeStack(currentPutStackHead);
+ break;
+ }
+ }
+ }
+
+ private boolean drainPutStack() {
+ for (; ; ) {
+ Node head = putStack.get();
+ if (head == null) {
+ return false;
+ }
+
+ if (putStack.compareAndSet(head, null)) {
+ copyIntoTakeStack(head);
+ return true;
+ }
+ }
+ }
+
+ private void copyIntoTakeStack(Node putStackHead) {
+ int putStackSize = putStackHead.size;
+
+ takeStackSize.lazySet(putStackSize);
+
+ if (putStackSize > takeStack.length) {
+ takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
+ }
+
+ for (int i = putStackSize - 1; i >= 0; i--) {
+ takeStack[i] = putStackHead.item;
+ putStackHead = putStackHead.next;
+ }
+
+ takeStackIndex = 0;
+ assert takeStack[0] != null;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * Best effort implementation.
+ */
+ @Override
+ public int size() {
+ Node h = putStack.get();
+ int putStackSize = h == null ? 0 : h.size;
+ return putStackSize + takeStackSize.get();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ @Override
+ public void put(E e) throws InterruptedException {
+ offer(e);
+ }
+
+ @Override
+ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+ add(e);
+ return true;
+ }
+
+ @Override
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ private static final class Node<E> {
+ Node next;
+ E item;
+ int size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/abb95fac/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index a653429..39dde09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.client.util.MpscQueue;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -74,7 +75,7 @@ public class StripedExecutor implements ExecutorService {
try {
for (int i = 0; i < cnt; i++) {
- stripes[i] = new StripeConcurrentQueue(
+ stripes[i] = new StripeMPSC(
gridName,
poolName,
i,
@@ -475,6 +476,45 @@ public class StripedExecutor implements ExecutorService {
}
}
+ private static class StripeMPSC extends Stripe {
+ private final MpscQueue<Runnable> q = new MpscQueue<>();
+
+ public StripeMPSC(
+ String gridName,
+ String poolName,
+ int idx,
+ IgniteLogger log
+ ) {
+ super(
+ gridName,
+ poolName,
+ idx,
+ log);
+ }
+
+ @Override void start() {
+ super.start();
+
+ q.setConsumerThread(thread);
+ }
+
+ @Override void execute(Runnable cmd) {
+ q.offer(cmd);
+ }
+
+ @Override Runnable take() throws InterruptedException {
+ return q.take();
+ }
+
+ @Override int queueSize() {
+ return q.size();
+ }
+
+ @Override String queueToString() {
+ return q.toString();
+ }
+ }
+
/**
* Stripe.
*/