You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/12/29 11:07:21 UTC
[1/2] ignite git commit: IGNITE-2314: WIP.
Repository: ignite
Updated Branches:
refs/heads/ignite-2314 [created] 4f55c84f0
IGNITE-2314: WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f8011bb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f8011bb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f8011bb5
Branch: refs/heads/ignite-2314
Commit: f8011bb5b1a34e8c3e7b1ccca022f19c6824f1cb
Parents: 4a1a80c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Dec 29 12:51:26 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Dec 29 12:51:26 2015 +0300
----------------------------------------------------------------------
.../util/ManyToOneConcurrentLinkedQueue.java | 90 ++++++++++++++++++++
1 file changed, 90 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8011bb5/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
new file mode 100644
index 0000000..d30a5da
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
@@ -0,0 +1,90 @@
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ *
+ */
+public class ManyToOneConcurrentLinkedQueue<E>
+{
+ protected static final AtomicReferenceFieldUpdater<ManyToOneConcurrentLinkedQueue, Node> TAIL_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(ManyToOneConcurrentLinkedQueue.class, Node.class, "tail");
+
+ protected volatile ManyToOneConcurrentLinkedQueue.Node tail;
+
+ private Node head;
+
+ public ManyToOneConcurrentLinkedQueue()
+ {
+ head = new Node(null);
+ TAIL_UPD.lazySet(this, head);
+ }
+
+ public boolean offer(final E e)
+ {
+ if (null == e)
+ {
+ throw new NullPointerException("element cannot be null");
+ }
+
+ final Node newTail = new Node(e);
+ final Node prevTail = swapTail(newTail);
+ prevTail.setNextOrdered(newTail);
+
+ return true;
+ }
+
+ public E poll()
+ {
+ Object value = null;
+
+ final Node node = head.next;
+
+ if (null != node)
+ {
+ value = node.value;
+ node.value = null;
+ head = node;
+ }
+
+ return (E)value;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Node swapTail(final Node newTail)
+ {
+ return TAIL_UPD.getAndSet(this, newTail);
+ }
+
+ /**
+ * Node with data.
+ */
+ private static class Node
+ {
+ public static final AtomicReferenceFieldUpdater<Node, Node> NODE_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
+
+ Object value;
+ volatile Node next;
+
+ /**
+ * Constructor.
+ *
+ * @param value Value.
+ */
+ private Node(Object value)
+ {
+ this.value = value;
+ }
+
+ /**
+ * Set next node.
+ *
+ * @param next Next node.
+ */
+ void setNextOrdered(Node next)
+ {
+ NODE_UPD.lazySet(this, next);
+ }
+ }
+}
\ No newline at end of file
[2/2] ignite git commit: IGNITE-2314: Implemented.
Posted by vo...@apache.org.
IGNITE-2314: Implemented.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f55c84f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f55c84f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f55c84f
Branch: refs/heads/ignite-2314
Commit: 4f55c84f0cca6946165d004e75157a847628d123
Parents: f8011bb
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Dec 29 13:08:09 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Dec 29 13:08:09 2015 +0300
----------------------------------------------------------------------
.../util/MPSCConcurrentLinkedQueue.java | 127 +++++++++++++++++++
.../util/ManyToOneConcurrentLinkedQueue.java | 90 -------------
.../ignite/internal/util/nio/GridNioServer.java | 5 +-
3 files changed, 130 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f55c84f/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java
new file mode 100644
index 0000000..1990732
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * MP-SC concurrent linked queue implementation based on Dmitry Vyukov's
+ * <a href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue">
+ * Non-intrusive MPSC node-based queue</a>.
+ */
+public class MPSCConcurrentLinkedQueue<E>
+{
+ /** Tail field updater. */
+ private static final AtomicReferenceFieldUpdater<MPSCConcurrentLinkedQueue, Node> TAIL_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(MPSCConcurrentLinkedQueue.class, Node.class, "tail");
+
+ /** Head. */
+ private Node head;
+
+ /** Tail. */
+ @SuppressWarnings({"UnusedDeclaration", "FieldCanBeLocal"})
+ private volatile Node tail;
+
+ /**
+ * Constructor.
+ */
+ public MPSCConcurrentLinkedQueue()
+ {
+ head = new Node(null);
+
+ tail = head;
+ }
+
+ /**
+ * Offer element.
+ *
+ * @param e Element.
+ */
+ public void offer(final E e)
+ {
+ if (e == null)
+ throw new IllegalArgumentException("Null are not allowed.");
+
+ Node newTail = new Node(e);
+
+ Node prevTail = TAIL_UPD.getAndSet(this, newTail);
+
+ prevTail.setNext(newTail);
+ }
+
+ /**
+ * Poll element.
+ *
+ * @return Element.
+ */
+ @SuppressWarnings("unchecked")
+ public E poll()
+ {
+ final Node node = head.next;
+
+ if (node != null)
+ {
+ Object val = node.val;
+
+ node.val = null;
+
+ head = node;
+
+ return (E)val;
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Node with data.
+ */
+ private static class Node
+ {
+ /** Next field updater. */
+ public static final AtomicReferenceFieldUpdater<Node, Node> NODE_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
+
+ /** Value. */
+ private Object val;
+
+ /** Next node. */
+ @SuppressWarnings("UnusedDeclaration")
+ private volatile Node next;
+
+ /**
+ * Constructor.
+ *
+ * @param val Value.
+ */
+ private Node(Object val)
+ {
+ this.val = val;
+ }
+
+ /**
+ * Set next node.
+ *
+ * @param next Next node.
+ */
+ void setNext(Node next)
+ {
+ NODE_UPD.lazySet(this, next);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f55c84f/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
deleted file mode 100644
index d30a5da..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.ignite.internal.util;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/**
- *
- */
-public class ManyToOneConcurrentLinkedQueue<E>
-{
- protected static final AtomicReferenceFieldUpdater<ManyToOneConcurrentLinkedQueue, Node> TAIL_UPD =
- AtomicReferenceFieldUpdater.newUpdater(ManyToOneConcurrentLinkedQueue.class, Node.class, "tail");
-
- protected volatile ManyToOneConcurrentLinkedQueue.Node tail;
-
- private Node head;
-
- public ManyToOneConcurrentLinkedQueue()
- {
- head = new Node(null);
- TAIL_UPD.lazySet(this, head);
- }
-
- public boolean offer(final E e)
- {
- if (null == e)
- {
- throw new NullPointerException("element cannot be null");
- }
-
- final Node newTail = new Node(e);
- final Node prevTail = swapTail(newTail);
- prevTail.setNextOrdered(newTail);
-
- return true;
- }
-
- public E poll()
- {
- Object value = null;
-
- final Node node = head.next;
-
- if (null != node)
- {
- value = node.value;
- node.value = null;
- head = node;
- }
-
- return (E)value;
- }
-
- @SuppressWarnings("unchecked")
- private Node swapTail(final Node newTail)
- {
- return TAIL_UPD.getAndSet(this, newTail);
- }
-
- /**
- * Node with data.
- */
- private static class Node
- {
- public static final AtomicReferenceFieldUpdater<Node, Node> NODE_UPD =
- AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
-
- Object value;
- volatile Node next;
-
- /**
- * Constructor.
- *
- * @param value Value.
- */
- private Node(Object value)
- {
- this.value = value;
- }
-
- /**
- * Set next node.
- *
- * @param next Next node.
- */
- void setNextOrdered(Node next)
- {
- NODE_UPD.lazySet(this, next);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f55c84f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 17a0b8f..1375d03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -41,7 +41,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Queue;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -50,6 +49,7 @@ import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.MPSCConcurrentLinkedQueue;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -1244,7 +1244,8 @@ public class GridNioServer<T> {
*/
private abstract class AbstractNioClientWorker extends GridWorker {
/** Queue of change requests on this selector. */
- private final Queue<NioOperationFuture> changeReqs = new ConcurrentLinkedDeque8<>();
+ private final MPSCConcurrentLinkedQueue<NioOperationFuture> changeReqs =
+ new MPSCConcurrentLinkedQueue<>();
/** Selector to select read events. */
private Selector selector;