You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/23 14:53:45 UTC

ignite git commit: new queue in striped pool

Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance-master 6951b20f4 -> abb95fac5


new queue in striped pool


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/abb95fac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/abb95fac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/abb95fac

Branch: refs/heads/ignite-comm-balance-master
Commit: abb95fac5c15c46416e58331c4f249ebe10d981b
Parents: 6951b20
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Jan 23 17:52:30 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Jan 23 17:52:30 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/client/util/MpscQueue.java  | 289 +++++++++++++++++++
 .../ignite/internal/util/StripedExecutor.java   |  42 ++-
 2 files changed, 330 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/abb95fac/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
new file mode 100644
index 0000000..8821f66
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.util;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+import static java.util.concurrent.locks.LockSupport.park;
+import static java.util.concurrent.locks.LockSupport.unpark;
+
+/**
+ * Multi producer single consumer queue.
+ */
+public class MpscQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
+    static final int INITIAL_ARRAY_SIZE = 512;
+    static final Node BLOCKED = new Node();
+
+    final AtomicReference<Node> putStack = new AtomicReference<Node>();
+    private final AtomicInteger takeStackSize = new AtomicInteger();
+
+    private Thread consumerThread;
+    private Object[] takeStack = new Object[INITIAL_ARRAY_SIZE];
+    private int takeStackIndex = -1;
+
+    static int nextPowerOfTwo(final int value) {
+        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+    }
+
+    public void setConsumerThread(Thread consumerThread) {
+        this.consumerThread = consumerThread;
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * This call is threadsafe; but it will only remove the items that are on the put-stack.
+     */
+    @Override
+    public void clear() {
+        putStack.set(BLOCKED);
+    }
+
+    @Override
+    public boolean offer(E item) {
+        A.notNull(item, "item can't be null");
+
+        AtomicReference<Node> putStack = this.putStack;
+        Node newHead = new Node();
+        newHead.item = item;
+
+        for (; ; ) {
+            Node oldHead = putStack.get();
+            if (oldHead == null || oldHead == BLOCKED) {
+                newHead.next = null;
+                newHead.size = 1;
+            } else {
+                newHead.next = oldHead;
+                newHead.size = oldHead.size + 1;
+            }
+
+            if (!putStack.compareAndSet(oldHead, newHead)) {
+                continue;
+            }
+
+            if (oldHead == BLOCKED) {
+                unpark(consumerThread);
+            }
+
+            return true;
+        }
+    }
+
+    @Override
+    public E peek() {
+        E item = peekNext();
+        if (item != null) {
+            return item;
+        }
+        if (!drainPutStack()) {
+            return null;
+        }
+        return peekNext();
+    }
+
+    @Override
+    public E take() throws InterruptedException {
+        E item = next();
+        if (item != null) {
+            return item;
+        }
+
+        takeAll();
+        assert takeStackIndex == 0;
+        assert takeStack[takeStackIndex] != null;
+
+        return next();
+    }
+
+    @Override
+    public E poll() {
+        E item = next();
+
+        if (item != null) {
+            return item;
+        }
+
+        if (!drainPutStack()) {
+            return null;
+        }
+
+        return next();
+    }
+
+    private E next() {
+        E item = peekNext();
+        if (item != null) {
+            dequeue();
+        }
+        return item;
+    }
+
+    private E peekNext() {
+        if (takeStackIndex == -1) {
+            return null;
+        }
+
+        if (takeStackIndex == takeStack.length) {
+            takeStackIndex = -1;
+            return null;
+        }
+
+        E item = (E) takeStack[takeStackIndex];
+        if (item == null) {
+            takeStackIndex = -1;
+            return null;
+        }
+        return item;
+    }
+
+    private void dequeue() {
+        takeStack[takeStackIndex] = null;
+        takeStackIndex++;
+        takeStackSize.lazySet(takeStackSize.get() - 1);
+    }
+
+    private void takeAll() throws InterruptedException {
+        AtomicReference<Node> putStack = this.putStack;
+        for (; ; ) {
+            if (consumerThread.isInterrupted()) {
+                putStack.compareAndSet(BLOCKED, null);
+                throw new InterruptedException();
+            }
+
+            Node currentPutStackHead = putStack.get();
+
+            if (currentPutStackHead == null) {
+                // there is nothing to be take, so lets block.
+                if (!putStack.compareAndSet(null, BLOCKED)) {
+                    // we are lucky, something is available
+                    continue;
+                }
+
+                // lets block for real.
+                park();
+            } else if (currentPutStackHead == BLOCKED) {
+                park();
+            } else {
+                if (!putStack.compareAndSet(currentPutStackHead, null)) {
+                    continue;
+                }
+
+                copyIntoTakeStack(currentPutStackHead);
+                break;
+            }
+        }
+    }
+
+    private boolean drainPutStack() {
+        for (; ; ) {
+            Node head = putStack.get();
+            if (head == null) {
+                return false;
+            }
+
+            if (putStack.compareAndSet(head, null)) {
+                copyIntoTakeStack(head);
+                return true;
+            }
+        }
+    }
+
+    private void copyIntoTakeStack(Node putStackHead) {
+        int putStackSize = putStackHead.size;
+
+        takeStackSize.lazySet(putStackSize);
+
+        if (putStackSize > takeStack.length) {
+            takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
+        }
+
+        for (int i = putStackSize - 1; i >= 0; i--) {
+            takeStack[i] = putStackHead.item;
+            putStackHead = putStackHead.next;
+        }
+
+        takeStackIndex = 0;
+        assert takeStack[0] != null;
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * Best effort implementation.
+     */
+    @Override
+    public int size() {
+        Node h = putStack.get();
+        int putStackSize = h == null ? 0 : h.size;
+        return putStackSize + takeStackSize.get();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    @Override
+    public void put(E e) throws InterruptedException {
+        offer(e);
+    }
+
+    @Override
+    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+        add(e);
+        return true;
+    }
+
+    @Override
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    private static final class Node<E> {
+        Node next;
+        E item;
+        int size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/abb95fac/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index a653429..39dde09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.client.util.MpscQueue;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -74,7 +75,7 @@ public class StripedExecutor implements ExecutorService {
 
         try {
             for (int i = 0; i < cnt; i++) {
-                stripes[i] = new StripeConcurrentQueue(
+                stripes[i] = new StripeMPSC(
                     gridName,
                     poolName,
                     i,
@@ -475,6 +476,45 @@ public class StripedExecutor implements ExecutorService {
         }
     }
 
+    private static class StripeMPSC extends Stripe {
+        private final MpscQueue<Runnable> q = new MpscQueue<>();
+
+        public StripeMPSC(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(
+                gridName,
+                poolName,
+                idx,
+                log);
+        }
+
+        @Override void start() {
+            super.start();
+
+            q.setConsumerThread(thread);
+        }
+
+        @Override void execute(Runnable cmd) {
+            q.offer(cmd);
+        }
+
+        @Override Runnable take() throws InterruptedException {
+            return q.take();
+        }
+
+        @Override int queueSize() {
+            return q.size();
+        }
+
+        @Override String queueToString() {
+            return q.toString();
+        }
+    }
+
     /**
      * Stripe.
      */