You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:37 UTC
[14/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
index 630db1c..9c8c2db 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
@@ -5,8 +5,12 @@
*/
/*
- * The initial version of this file was copied from JSR-166:
- * http://gee.cs.oswego.edu/dl/concurrency-interest/
+ * The latest version of the file corresponds to the following CVS commit:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
+ * ConcurrentLinkedDeque.java?pathrev=1.33
+ *
+ * The later versions use JDK 8 specific classes that are unavailable in JDK 7.
+ * Thus those commits can't be imported.
*/
package org.jsr166;
@@ -18,6 +22,7 @@ import java.security.*;
import java.util.*;
import java.util.Queue;
+
/**
* An unbounded concurrent {@linkplain Deque deque} based on linked nodes.
* Concurrent insertion, removal, and access operations execute safely
@@ -55,13 +60,21 @@ import java.util.Queue;
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code ConcurrentLinkedDeque} in another thread.
- * <p>
- * Written by Doug Lea and Martin Buchholz with assistance from members of
- * JCP JSR-166 Expert Group and released to the public domain, as explained
- * at http://creativecommons.org/publicdomain/zero/1.0/
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.7
+ * @author Doug Lea
+ * @author Martin Buchholz
+ * @param <E> the type of elements held in this collection
*/
-@SuppressWarnings( {"ALL"})
-public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements Deque<E> {
+@SuppressWarnings("ALL")
+public class ConcurrentLinkedDeque8<E>
+ extends AbstractCollection<E>
+ implements Deque<E>, java.io.Serializable {
+
/*
* This is an implementation of a concurrent lock-free deque
* supporting interior removes but not interior insertions, as
@@ -213,6 +226,8 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* good as we can hope for.
*/
+ private static final long serialVersionUID = 876323262645176354L;
+
/**
* A node from which the first node on list (that is, the unique node p
* with p.prev == null && p.next != p) can be reached in O(1) time.
@@ -226,7 +241,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* - head.item may or may not be null
* - head may not be reachable from the first or last node, or from tail
*/
- private volatile Node<E> head;
+ private transient volatile Node<E> head;
/**
* A node from which the last node on list (that is, the unique node p
@@ -240,12 +255,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* - tail.item may or may not be null
* - tail may not be reachable from the first or last node, or from head
*/
- private volatile Node<E> tail;
+ private transient volatile Node<E> tail;
/** */
private final LongAdder8 size = new LongAdder8();
- /** Previous and next terminators. */
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
@SuppressWarnings("unchecked")
@@ -258,29 +272,17 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
return (Node<E>) NEXT_TERMINATOR;
}
- /**
- * Internal node element.
- *
- * @param <E> Node item.
- */
- @SuppressWarnings( {"PackageVisibleField", "PackageVisibleInnerClass"})
public static final class Node<E> {
volatile Node<E> prev;
volatile E item;
volatile Node<E> next;
- /**
- * Default constructor for NEXT_TERMINATOR, PREV_TERMINATOR.
- */
- Node() {
- // No-op.
+ Node() { // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR
}
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext or casPrev.
- *
- * @param item Item to initialize.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
@@ -293,73 +295,44 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
return item;
}
- /**
- * @param cmp Compare value.
- * @param val New value.
- * @return {@code True} if set.
- */
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
- /**
- * @param val New value.
- */
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
- /**
- * @param cmp Compare value.
- * @param val New value.
- * @return {@code True} if set.
- */
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
- /**
- * @param val New value.
- */
void lazySetPrev(Node<E> val) {
UNSAFE.putOrderedObject(this, prevOffset, val);
}
- /**
- * @param cmp Compare value.
- * @param val New value.
- * @return {@code True} if set.
- */
boolean casPrev(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
}
- /** Unsafe. */
- private static final Unsafe UNSAFE;
+ // Unsafe mechanics
- /** Previous field offset. */
+ private static final sun.misc.Unsafe UNSAFE;
private static final long prevOffset;
-
- /** Item field offset. */
private static final long itemOffset;
-
- /** Next field offset. */
private static final long nextOffset;
- /**
- * Initialize offsets.
- */
static {
try {
UNSAFE = unsafe();
-
- Class k = Node.class;
-
- prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev"));
- itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
- nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
- }
- catch (Exception e) {
+ Class<?> k = Node.class;
+ prevOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("prev"));
+ itemOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("item"));
+ nextOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("next"));
+ } catch (Exception e) {
throw new Error(e);
}
}
@@ -376,9 +349,10 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
final Node<E> newNode = new Node<E>(e);
restartFromHead:
- for (;;) {
+ for (;;)
for (Node<E> h = head, p = h, q;;) {
- if ((q = p.prev) != null && (q = (p = q).prev) != null)
+ if ((q = p.prev) != null &&
+ (q = (p = q).prev) != null)
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q;
@@ -386,21 +360,18 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
continue restartFromHead;
else {
// p is first node
- newNode.lazySetNext(p); // CAS piggyback.
-
+ newNode.lazySetNext(p); // CAS piggyback
if (p.casPrev(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != h) // hop two nodes at a time
casHead(h, newNode); // Failure is OK.
-
return;
}
// Lost CAS race to another thread; re-read prev
}
}
- }
}
/**
@@ -446,8 +417,6 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
/**
* Links e as last element.
- *
- * @param e Element to link.
*/
private void linkLast(E e) {
checkNotNull(e);
@@ -457,9 +426,10 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
final Node<E> newNode = new Node<E>(e);
restartFromTail:
- for (;;) {
+ for (;;)
for (Node<E> t = tail, p = t, q;;) {
- if ((q = p.next) != null && (q = (p = q).next) != null)
+ if ((q = p.next) != null &&
+ (q = (p = q).next) != null)
// Check for tail updates every other hop.
// If p == q, we are sure to follow tail instead.
p = (t != (t = tail)) ? t : q;
@@ -467,21 +437,18 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
continue restartFromTail;
else {
// p is last node
- newNode.lazySetPrev(p); // CAS piggyback.
-
+ newNode.lazySetPrev(p); // CAS piggyback
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
-
return;
}
// Lost CAS race to another thread; re-read next
}
}
- }
}
/**
@@ -563,7 +530,6 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
}
}
- /** Number of HOPs before unlinking head or tail. */
private static final int HOPS = 2;
/**
@@ -589,7 +555,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
/**
* Unlinks non-null node x.
*/
- private void unlink(Node<E> x) {
+ void unlink(Node<E> x) {
// assert x != null;
// assert x.item == null;
// assert x != PREV_TERMINATOR;
@@ -601,11 +567,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// Unlink should not be called twice for the same node.
size.decrement();
- if (prev == null)
+ if (prev == null) {
unlinkFirst(x, next);
- else if (next == null)
+ } else if (next == null) {
unlinkLast(x, prev);
- else {
+ } else {
// Unlink interior node.
//
// This is the common case, since a series of polls at the
@@ -626,31 +592,22 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// tail/head, before setting x's prev/next links to their
// logical approximate replacements, self/TERMINATOR.
Node<E> activePred, activeSucc;
-
boolean isFirst, isLast;
-
int hops = 1;
// Find active predecessor
for (Node<E> p = prev; ; ++hops) {
if (p.item != null) {
activePred = p;
-
isFirst = false;
-
break;
}
-
Node<E> q = p.prev;
-
if (q == null) {
if (p.next == p)
return;
-
activePred = p;
-
isFirst = true;
-
break;
}
else if (p == q)
@@ -663,22 +620,15 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
for (Node<E> p = next; ; ++hops) {
if (p.item != null) {
activeSucc = p;
-
isLast = false;
-
break;
}
-
Node<E> q = p.next;
-
if (q == null) {
if (p.prev == p)
return;
-
activeSucc = p;
-
isLast = true;
-
break;
}
else if (p == q)
@@ -688,8 +638,9 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
}
// TODO: better HOP heuristics
- // Always squeeze out interior deleted nodes.
- if (hops < HOPS && (isFirst | isLast))
+ if (hops < HOPS
+ // always squeeze out interior deleted nodes
+ && (isFirst | isLast))
return;
// Squeeze out deleted nodes between activePred and
@@ -699,6 +650,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// Try to gc-unlink, if possible
if ((isFirst | isLast) &&
+
// Recheck expected state of predecessor and successor
(activePred.next == activeSucc) &&
(activeSucc.prev == activePred) &&
@@ -793,11 +745,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// Either head already points to an active node, or we keep
// trying to cas it to the first node until it does.
Node<E> h, p, q;
-
restartFromHead:
while ((h = head).item == null && (p = h.prev) != null) {
for (;;) {
- if ((q = p.prev) == null || (q = (p = q).prev) == null) {
+ if ((q = p.prev) == null ||
+ (q = (p = q).prev) == null) {
// It is possible that p is PREV_TERMINATOR,
// but if so, the CAS is guaranteed to fail.
if (casHead(h, p))
@@ -823,11 +775,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// Either tail already points to an active node, or we keep
// trying to cas it to the last node until it does.
Node<E> t, p, q;
-
restartFromTail:
while ((t = tail).item == null && (p = t.next) != null) {
for (;;) {
- if ((q = p.next) == null || (q = (p = q).next) == null) {
+ if ((q = p.next) == null ||
+ (q = (p = q).next) == null) {
// It is possible that p is NEXT_TERMINATOR,
// but if so, the CAS is guaranteed to fail.
if (casTail(t, p))
@@ -843,9 +795,6 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
}
}
- /**
- * @param x Node to start from.
- */
private void skipDeletedPredecessors(Node<E> x) {
whileActive:
do {
@@ -854,18 +803,14 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// assert x != NEXT_TERMINATOR;
// assert x != PREV_TERMINATOR;
Node<E> p = prev;
-
findActive:
for (;;) {
if (p.item != null)
break findActive;
-
Node<E> q = p.prev;
-
if (q == null) {
if (p.next == p)
continue whileActive;
-
break findActive;
}
else if (p == q)
@@ -881,9 +826,6 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
} while (x.item != null || x.next == null);
}
- /**
- * @param x Node to start from.
- */
private void skipDeletedSuccessors(Node<E> x) {
whileActive:
do {
@@ -892,19 +834,14 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// assert x != NEXT_TERMINATOR;
// assert x != PREV_TERMINATOR;
Node<E> p = next;
-
findActive:
-
for (;;) {
if (p.item != null)
break findActive;
-
Node<E> q = p.next;
-
if (q == null) {
if (p.prev == p)
continue whileActive;
-
break findActive;
}
else if (p == q)
@@ -917,22 +854,17 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
if (next == p || x.casNext(next, p))
return;
- }
- while (x.item != null || x.prev == null);
+ } while (x.item != null || x.prev == null);
}
/**
* Returns the successor of p, or the first node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
- *
- * @param p Node to find successor for.
- * @return Successor node.
*/
- final Node<E> successor(Node<E> p) {
+ final Node<E> succ(Node<E> p) {
// TODO: should we skip deleted nodes here?
Node<E> q = p.next;
-
return (p == q) ? first() : q;
}
@@ -940,11 +872,8 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* Returns the predecessor of p, or the last node if p.prev has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
- *
- * @param p Node to find predecessor for.
- * @return Predecessor node.
*/
- final Node<E> predecessor(Node<E> p) {
+ final Node<E> pred(Node<E> p) {
Node<E> q = p.prev;
return (p == q) ? last() : q;
}
@@ -954,10 +883,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* p.prev == null && p.next != p
* The returned node may or may not be logically deleted.
* Guarantees that head is set to the returned node.
- *
- * @return First node.
*/
- @SuppressWarnings( {"TooBroadScope"})
Node<E> first() {
restartFromHead:
for (;;)
@@ -982,10 +908,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* p.next == null && p.prev != p
* The returned node may or may not be logically deleted.
* Guarantees that tail is set to the returned node.
- *
- * @return Last node.
*/
- @SuppressWarnings( {"TooBroadScope"})
Node<E> last() {
restartFromTail:
for (;;)
@@ -1005,6 +928,8 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
}
}
+ // Minor convenience utilities
+
/**
* Throws NullPointerException if argument is null.
*
@@ -1025,7 +950,6 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
private E screenNullResult(E v) {
if (v == null)
throw new NoSuchElementException();
-
return v;
}
@@ -1033,18 +957,15 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* Creates an array list and fills it with elements of this list.
* Used by toArray.
*
- * @return the arrayList
+ * @return the array list
*/
private ArrayList<E> toArrayList() {
ArrayList<E> list = new ArrayList<E>();
-
- for (Node<E> p = first(); p != null; p = successor(p)) {
+ for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
-
if (item != null)
list.add(item);
}
-
return list;
}
@@ -1052,7 +973,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* Constructs an empty deque.
*/
public ConcurrentLinkedDeque8() {
- head = tail = new Node<E>();
+ head = tail = new Node<E>(null);
}
/**
@@ -1064,15 +985,12 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
- public ConcurrentLinkedDeque8(Iterable<? extends E> c) {
+ public ConcurrentLinkedDeque8(Collection<? extends E> c) {
// Copy c into a private chain of Nodes
Node<E> h = null, t = null;
-
for (E e : c) {
checkNotNull(e);
-
Node<E> newNode = new Node<E>(e);
-
if (h == null)
h = t = newNode;
else {
@@ -1081,15 +999,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
t = newNode;
}
}
-
initHeadTail(h, t);
}
/**
* Initializes head and tail, ensuring invariants hold.
- *
- * @param h Head.
- * @param t Tail.
*/
private void initHeadTail(Node<E> h, Node<E> t) {
if (h == t) {
@@ -1098,15 +1012,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
else {
// Avoid edge case of a single Node with non-null item.
Node<E> newNode = new Node<E>(null);
-
t.lazySetNext(newNode);
-
newNode.lazySetPrev(t);
-
t = newNode;
}
}
-
head = h;
tail = t;
}
@@ -1118,21 +1028,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*
* @throws NullPointerException if the specified element is null
*/
- @Override public void addFirst(E e) {
+ public void addFirst(E e) {
linkFirst(e);
}
/**
- * Same as {@link #addFirst(Object)}, but returns new node.
- *
- * @param e Element to add.
- * @return New node.
- */
- public Node<E> addFirstx(E e) {
- return linkFirstx(e);
- }
-
- /**
* Inserts the specified element at the end of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException}.
@@ -1141,7 +1041,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*
* @throws NullPointerException if the specified element is null
*/
- @Override public void addLast(E e) {
+ public void addLast(E e) {
linkLast(e);
}
@@ -1162,9 +1062,8 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return {@code true} (as specified by {@link Deque#offerFirst})
* @throws NullPointerException if the specified element is null
*/
- @Override public boolean offerFirst(E e) {
+ public boolean offerFirst(E e) {
linkFirst(e);
-
return true;
}
@@ -1187,9 +1086,8 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return {@code true} (as specified by {@link Deque#offerLast})
* @throws NullPointerException if the specified element is null
*/
- @Override public boolean offerLast(E e) {
+ public boolean offerLast(E e) {
linkLast(e);
-
return true;
}
@@ -1203,15 +1101,12 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
return linkLastx(e);
}
- /** {@inheritDoc} */
- @Override public E peekFirst() {
- for (Node<E> p = first(); p != null; p = successor(p)) {
+ public E peekFirst() {
+ for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
-
if (item != null)
return item;
}
-
return null;
}
@@ -1222,7 +1117,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return The header node of this deque, or <tt>null</tt> if this deque is empty
*/
public Node<E> peekFirstx() {
- for (Node<E> p = first(); p != null; p = successor(p)) {
+ for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null)
@@ -1232,76 +1127,67 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
return null;
}
- /** {@inheritDoc} */
- @Override public E peekLast() {
- for (Node<E> p = last(); p != null; p = predecessor(p)) {
+ public E peekLast() {
+ for (Node<E> p = last(); p != null; p = pred(p)) {
E item = p.item;
-
if (item != null)
return item;
}
-
return null;
}
/**
* @throws NoSuchElementException {@inheritDoc}
*/
- @Override public E getFirst() {
+ public E getFirst() {
return screenNullResult(peekFirst());
}
/**
* @throws NoSuchElementException {@inheritDoc}
*/
- @Override public E getLast() {
+ public E getLast() {
return screenNullResult(peekLast());
}
- /** {@inheritDoc} */
- @Override public E pollFirst() {
- for (Node<E> p = first(); p != null; p = successor(p)) {
+ public E pollFirst() {
+ for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
-
if (item != null && p.casItem(item, null)) {
unlink(p);
-
return item;
}
}
-
return null;
}
- /** {@inheritDoc} */
- @Override public E pollLast() {
- for (Node<E> p = last(); p != null; p = predecessor(p)) {
+ public E pollLast() {
+ for (Node<E> p = last(); p != null; p = pred(p)) {
E item = p.item;
-
if (item != null && p.casItem(item, null)) {
unlink(p);
-
return item;
}
}
-
return null;
}
/**
* @throws NoSuchElementException {@inheritDoc}
*/
- @Override public E removeFirst() {
+ public E removeFirst() {
return screenNullResult(pollFirst());
}
/**
* @throws NoSuchElementException {@inheritDoc}
*/
- @Override public E removeLast() {
+ public E removeLast() {
return screenNullResult(pollLast());
}
+ // *** Queue and stack methods ***
+
/**
* Inserts the specified element at the tail of this deque.
* As the deque is unbounded, this method will never return {@code false}.
@@ -1309,21 +1195,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
- @Override public boolean offer(E e) {
+ public boolean offer(E e) {
return offerLast(e);
}
/**
- * Same as {@link #offer(Object)}, but returns new {@link Node}.
- *
- * @param e Element to add.
- * @return New node.
- */
- public Node<E> offerx(E e) {
- return offerLastx(e);
- }
-
- /**
* Inserts the specified element at the tail of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
@@ -1331,7 +1207,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
- @Override public boolean add(E e) {
+ public boolean add(E e) {
return offerLast(e);
}
@@ -1345,20 +1221,12 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
return offerLastx(e);
}
- /** {@inheritDoc} */
- @Override public E poll() {
- return pollFirst();
- }
-
- /** {@inheritDoc} */
- @Override public E remove() {
- return removeFirst();
- }
-
- /** {@inheritDoc} */
- @Override public E peek() {
- return peekFirst();
- }
+ public E poll() { return pollFirst(); }
+ public E remove() { return removeFirst(); }
+ public E peek() { return peekFirst(); }
+ public E element() { return getFirst(); }
+ public void push(E e) { addFirst(e); }
+ public E pop() { return removeFirst(); }
/**
* Retrieves, but does not remove, the header node of the queue represented by
@@ -1374,21 +1242,6 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
return peekFirstx();
}
- /** {@inheritDoc} */
- @Override public E element() {
- return getFirst();
- }
-
- /** {@inheritDoc} */
- @Override public void push(E e) {
- addFirst(e);
- }
-
- /** {@inheritDoc} */
- @Override public E pop() {
- return removeFirst();
- }
-
/**
* Removes the first element {@code e} such that
* {@code o.equals(e)}, if such an element exists in this deque.
@@ -1398,19 +1251,15 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return {@code true} if the deque contained the specified element
* @throws NullPointerException if the specified element is null
*/
- @Override public boolean removeFirstOccurrence(Object o) {
+ public boolean removeFirstOccurrence(Object o) {
checkNotNull(o);
-
- for (Node<E> p = first(); p != null; p = successor(p)) {
+ for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
-
if (item != null && o.equals(item) && p.casItem(item, null)) {
unlink(p);
-
return true;
}
}
-
return false;
}
@@ -1423,19 +1272,15 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return {@code true} if the deque contained the specified element
* @throws NullPointerException if the specified element is null
*/
- @Override public boolean removeLastOccurrence(Object o) {
+ public boolean removeLastOccurrence(Object o) {
checkNotNull(o);
-
- for (Node<E> p = last(); p != null; p = predecessor(p)) {
+ for (Node<E> p = last(); p != null; p = pred(p)) {
E item = p.item;
-
if (item != null && o.equals(item) && p.casItem(item, null)) {
unlink(p);
-
return true;
}
}
-
return false;
}
@@ -1446,17 +1291,13 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @param o element whose presence in this deque is to be tested
* @return {@code true} if this deque contains the specified element
*/
- @Override public boolean contains(Object o) {
- if (o == null)
- return false;
-
- for (Node<E> p = first(); p != null; p = successor(p)) {
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
-
if (item != null && o.equals(item))
return true;
}
-
return false;
}
@@ -1465,7 +1306,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*
* @return {@code true} if this collection contains no elements
*/
- @Override public boolean isEmpty() {
+ public boolean isEmpty() {
return peekFirst() == null;
}
@@ -1497,16 +1338,14 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*
* @return the number of elements in this deque
*/
- @Override public int size() {
- int cnt = 0;
-
- for (Node<E> p = first(); p != null; p = successor(p))
+ public int size() {
+ int count = 0;
+ for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
- if (++cnt == Integer.MAX_VALUE)
+ if (++count == Integer.MAX_VALUE)
break;
-
- return cnt;
+ return count;
}
/**
@@ -1525,7 +1364,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* @return {@code true} if the deque contained the specified element
* @throws NullPointerException if the specified element is null
*/
- @Override public boolean remove(Object o) {
+ public boolean remove(Object o) {
return removeFirstOccurrence(o);
}
@@ -1541,8 +1380,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* of its elements are null
* @throws IllegalArgumentException if the collection is this deque
*/
- @SuppressWarnings( {"TooBroadScope"})
- @Override public boolean addAll(Collection<? extends E> c) {
+ public boolean addAll(Collection<? extends E> c) {
if (c == this)
// As historically specified in AbstractQueue#addAll
throw new IllegalArgumentException();
@@ -1554,19 +1392,14 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
for (E e : c) {
checkNotNull(e);
-
Node<E> newNode = new Node<E>(e);
-
if (beginningOfTheEnd == null) {
beginningOfTheEnd = last = newNode;
-
s++;
}
else {
last.lazySetNext(newNode);
-
newNode.lazySetPrev(last);
-
last = newNode;
s++;
@@ -1580,9 +1413,10 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// Atomically append the chain at the tail of this collection
restartFromTail:
- for (;;) {
+ for (;;)
for (Node<E> t = tail, p = t, q;;) {
- if ((q = p.next) != null && (q = (p = q).next) != null)
+ if ((q = p.next) != null &&
+ (q = (p = q).next) != null)
// Check for tail updates every other hop.
// If p == q, we are sure to follow tail instead.
p = (t != (t = tail)) ? t : q;
@@ -1591,7 +1425,6 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
else {
// p is last node
beginningOfTheEnd.lazySetPrev(p); // CAS piggyback
-
if (p.casNext(null, beginningOfTheEnd)) {
// Successful CAS is the linearization point
// for all elements to be added to this deque.
@@ -1599,26 +1432,22 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
// Try a little harder to update tail,
// since we may be adding many elements.
t = tail;
-
if (last.next == null)
casTail(t, last);
}
-
return true;
}
// Lost CAS race to another thread; re-read next
}
}
- }
}
/**
* Removes all of the elements from this deque.
*/
- @Override public void clear() {
- while (pollFirst() != null) {
- // No-op.
- }
+ public void clear() {
+ while (pollFirst() != null)
+ ;
}
/**
@@ -1634,7 +1463,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*
* @return an array containing all of the elements in this deque
*/
- @Override public Object[] toArray() {
+ public Object[] toArray() {
return toArrayList().toArray();
}
@@ -1661,8 +1490,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* The following code can be used to dump the deque into a newly
* allocated array of {@code String}:
*
- * <pre>
- * String[] y = x.toArray(new String[0]);</pre>
+ * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
*
* Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
@@ -1676,8 +1504,7 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
* this deque
* @throws NullPointerException if the specified array is null
*/
- @SuppressWarnings( {"SuspiciousToArrayCall"})
- @Override public <T> T[] toArray(T[] a) {
+ public <T> T[] toArray(T[] a) {
return toArrayList().toArray(a);
}
@@ -1694,8 +1521,8 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*
* @return an iterator over the elements in this deque in proper sequence
*/
- @Override public Iterator<E> iterator() {
- return new Iter();
+ public Iterator<E> iterator() {
+ return new Itr();
}
/**
@@ -1712,26 +1539,11 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*
* @return an iterator over the elements in this deque in reverse order
*/
- @Override public Iterator<E> descendingIterator() {
- return new DescendingIter();
- }
-
- /**
- * Extended iterator interface.
- */
- public interface IteratorEx<E> extends Iterator<E> {
- /**
- * Same semantics as iterator's remove, but will return {@code false} if remove did not happen.
- *
- * @return {@code True} if element was removed by this call, {@code false} otherwise.
- */
- public boolean removex();
+ public Iterator<E> descendingIterator() {
+ return new DescendingItr();
}
- /**
- * Abstract iterator.
- */
- private abstract class AbstractIter implements IteratorEx<E> {
+ private abstract class AbstractItr implements Iterator<E> {
/**
* Next node to return item for.
*/
@@ -1751,21 +1563,10 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
*/
private Node<E> lastRet;
- /**
- * @return Starting node.
- */
abstract Node<E> startNode();
-
- /**
- * @param p Node.
- * @return Next node.
- */
abstract Node<E> nextNode(Node<E> p);
- /**
- * Advances to first element.
- */
- AbstractIter() {
+ AbstractItr() {
advance();
}
@@ -1777,150 +1578,127 @@ public class ConcurrentLinkedDeque8<E> extends AbstractCollection<E> implements
lastRet = nextNode;
Node<E> p = (nextNode == null) ? startNode() : nextNode(nextNode);
-
for (;; p = nextNode(p)) {
if (p == null) {
// p might be active end or TERMINATOR node; both are OK
nextNode = null;
nextItem = null;
-
break;
}
-
E item = p.item;
-
if (item != null) {
nextNode = p;
nextItem = item;
-
break;
}
}
}
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
+ public boolean hasNext() {
return nextItem != null;
}
- /** {@inheritDoc} */
- @Override public E next() {
+ public E next() {
E item = nextItem;
-
- if (item == null)
- throw new NoSuchElementException();
-
+ if (item == null) throw new NoSuchElementException();
advance();
-
return item;
}
- /** {@inheritDoc} */
- @Override public void remove() {
+ public void remove() {
Node<E> l = lastRet;
-
- if (l == null)
- throw new IllegalStateException();
-
- unlinkx(l);
-
+ if (l == null) throw new IllegalStateException();
+ l.item = null;
+ unlink(l);
lastRet = null;
}
+ }
- /** {@inheritDoc} */
- @Override public boolean removex() {
- Node<E> l = lastRet;
-
- if (l == null)
- throw new IllegalStateException();
-
- boolean res = unlinkx(l);
-
- lastRet = null;
+ /** Forward iterator */
+ private class Itr extends AbstractItr {
+ Node<E> startNode() { return first(); }
+ Node<E> nextNode(Node<E> p) { return succ(p); }
+ }
- return res;
- }
+ /** Descending iterator */
+ private class DescendingItr extends AbstractItr {
+ Node<E> startNode() { return last(); }
+ Node<E> nextNode(Node<E> p) { return pred(p); }
}
/**
- * Forward iterator
+ * Saves this deque to a stream (that is, serializes it).
+ *
+ * @serialData All of the elements (each an {@code E}) in
+ * the proper order, followed by a null
*/
- private class Iter extends AbstractIter {
- /** {@inheritDoc} */
- @Override Node<E> startNode() {
- return first();
- }
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+
+ // Write out any hidden stuff
+ s.defaultWriteObject();
- /** {@inheritDoc} */
- @Override Node<E> nextNode(Node<E> p) {
- return successor(p);
+ // Write out all elements in the proper order.
+ for (Node<E> p = first(); p != null; p = succ(p)) {
+ E item = p.item;
+ if (item != null)
+ s.writeObject(item);
}
+
+ // Use trailing null as sentinel
+ s.writeObject(null);
}
/**
- * Descending iterator.
+ * Reconstitutes this deque from a stream (that is, deserializes it).
*/
- private class DescendingIter extends AbstractIter {
- /** {@inheritDoc} */
- @Override Node<E> startNode() {
- return last();
- }
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
- /** {@inheritDoc} */
- @Override Node<E> nextNode(Node<E> p) {
- return predecessor(p);
+ // Read in elements until trailing null sentinel found
+ Node<E> h = null, t = null;
+ Object item;
+ while ((item = s.readObject()) != null) {
+ @SuppressWarnings("unchecked")
+ Node<E> newNode = new Node<E>((E) item);
+ if (h == null)
+ h = t = newNode;
+ else {
+ t.lazySetNext(newNode);
+ newNode.lazySetPrev(t);
+ t = newNode;
+ }
}
+ initHeadTail(h, t);
}
- /**
- * CAS for head.
- *
- * @param cmp Compare value.
- * @param val New value.
- * @return {@code True} if set.
- */
private boolean casHead(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
- /**
- * CAS for tail.
- *
- * @param cmp Compare value.
- * @param val New value.
- * @return {@code True} if set.
- */
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
- /** Unsafe. */
- private static final Unsafe UNSAFE;
+ // Unsafe mechanics
- /** Head offset. */
+ private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
-
- /** Tail offset. */
private static final long tailOffset;
-
- /**
- * Initialize terminators using unsafe semantics.
- */
static {
PREV_TERMINATOR = new Node<Object>();
PREV_TERMINATOR.next = PREV_TERMINATOR;
NEXT_TERMINATOR = new Node<Object>();
NEXT_TERMINATOR.prev = NEXT_TERMINATOR;
-
try {
UNSAFE = unsafe();
-
- Class cls = ConcurrentLinkedDeque8.class;
-
- headOffset = UNSAFE.objectFieldOffset(cls.getDeclaredField("head"));
- tailOffset = UNSAFE.objectFieldOffset(cls.getDeclaredField("tail"));
- }
- catch (Exception e) {
+ Class<?> k = ConcurrentLinkedDeque8.class;
+ headOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("head"));
+ tailOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("tail"));
+ } catch (Exception e) {
throw new Error(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/jsr166/LongAdder8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/LongAdder8.java b/modules/core/src/main/java/org/jsr166/LongAdder8.java
index 2480a5d..79ea32e 100644
--- a/modules/core/src/main/java/org/jsr166/LongAdder8.java
+++ b/modules/core/src/main/java/org/jsr166/LongAdder8.java
@@ -5,8 +5,11 @@
*/
/*
- * The initial version of this file was copied from JSR-166:
- * http://gee.cs.oswego.edu/dl/concurrency-interest/
+ * The latest version of the file corresponds to the following CVS commit:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/atomic/LongAdder.java?pathrev=1.3
+ *
+ * The later versions are based on updated Striped64 that uses java.util.function package which is unavailable in JDK 7.
+ * Thus they can't be imported.
*/
package org.jsr166;
@@ -22,7 +25,7 @@ import java.util.concurrent.atomic.*;
* #longValue}) returns the current total combined across the
* variables maintaining the sum.
*
- * <p> This class is usually preferable to {@link AtomicLong} when
+ * <p>This class is usually preferable to {@link AtomicLong} when
* multiple threads update a common sum that is used for purposes such
* as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar
@@ -36,7 +39,7 @@ import java.util.concurrent.atomic.*;
* collection keys.
*
* <p><em>jsr166e note: This class is targeted to be placed in
- * java.util.concurrent.atomic<em>
+ * java.util.concurrent.atomic.</em>
*
* @since 1.8
* @author Doug Lea
@@ -67,8 +70,8 @@ public class LongAdder8 extends Striped64_8 implements Serializable {
boolean uncontended = true;
int h = (hc = threadHashCode.get()).code;
if (as == null || (n = as.length) < 1 ||
- (a = as[(n - 1) & h]) == null ||
- !(uncontended = a.cas(v = a.value, v + x)))
+ (a = as[(n - 1) & h]) == null ||
+ !(uncontended = a.cas(v = a.value, v + x)))
retryUpdate(x, hc, uncontended);
}
}
@@ -149,6 +152,14 @@ public class LongAdder8 extends Striped64_8 implements Serializable {
}
/**
+ * Returns the String representation of the {@link #sum}.
+ * @return the String representation of the {@link #sum}
+ */
+ public String toString() {
+ return Long.toString(sum());
+ }
+
+ /**
* Equivalent to {@link #sum}.
*
* @return the sum
@@ -182,25 +193,17 @@ public class LongAdder8 extends Striped64_8 implements Serializable {
}
private void writeObject(java.io.ObjectOutputStream s)
- throws java.io.IOException {
+ throws java.io.IOException {
s.defaultWriteObject();
s.writeLong(sum());
}
private void readObject(ObjectInputStream s)
- throws IOException, ClassNotFoundException {
+ throws IOException, ClassNotFoundException {
s.defaultReadObject();
busy = 0;
cells = null;
base = s.readLong();
}
- /**
- * Returns the String representation of the {@link #sum}.
- *
- * @return String representation of the {@link #sum}
- */
- public String toString() {
- return Long.toString(sum());
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/jsr166/README.txt
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/README.txt b/modules/core/src/main/java/org/jsr166/README.txt
new file mode 100644
index 0000000..491f2b4
--- /dev/null
+++ b/modules/core/src/main/java/org/jsr166/README.txt
@@ -0,0 +1,11 @@
+Package contains classes that from JSR166.
+
+The files were imported from the following repositories:
+- http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/
+- http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jdk7/
+
+To keep the imported files up-to-date each of them (except ConcurrentLinkedHashMap) contains a reference to
+a corresponding CVS commit.
+
+For more information please refer to the community page:
+http://gee.cs.oswego.edu/dl/concurrency-interest/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/jsr166/Striped64_8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/Striped64_8.java b/modules/core/src/main/java/org/jsr166/Striped64_8.java
index 9a4b1db..7281334 100644
--- a/modules/core/src/main/java/org/jsr166/Striped64_8.java
+++ b/modules/core/src/main/java/org/jsr166/Striped64_8.java
@@ -5,8 +5,11 @@
*/
/*
- * The initial version of this file was copied from JSR-166:
- * http://gee.cs.oswego.edu/dl/concurrency-interest/
+ * The latest version of the file corresponds to the following CVS commit:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/atomic/Striped64.java?pathrev=1.1
+ *
+ * The later versions use classes from java.util.function package that are unavailable in JDK 7.
+ * Thus they can't be imported.
*/
package org.jsr166;
@@ -329,18 +332,19 @@ abstract class Striped64_8 extends Number {
} catch (SecurityException se) {
try {
return java.security.AccessController.doPrivileged
- (new java.security
- .PrivilegedExceptionAction<sun.misc.Unsafe>() {
+ (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
- java.lang.reflect.Field f = sun.misc
- .Unsafe.class.getDeclaredField("theUnsafe");
+ java.lang.reflect.Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
- return (sun.misc.Unsafe) f.get(null);
- }});
+
+ return (sun.misc.Unsafe)f.get(null);
+ }
+ });
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
- e.getCause());
+ e.getCause());
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/jsr166/ThreadLocalRandom8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ThreadLocalRandom8.java b/modules/core/src/main/java/org/jsr166/ThreadLocalRandom8.java
index d7d5736..192db47 100644
--- a/modules/core/src/main/java/org/jsr166/ThreadLocalRandom8.java
+++ b/modules/core/src/main/java/org/jsr166/ThreadLocalRandom8.java
@@ -5,8 +5,12 @@
*/
/*
- * The initial version of this file was copied from JSR-166:
- * http://gee.cs.oswego.edu/dl/concurrency-interest/
+ * The latest version of the file corresponds to the following CVS commit:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jdk7/java/util/concurrent/
+ * ThreadLocalRandom.java.java?pathrev=1.3
+ *
+ * Note, that the repository above is JDK 7 based that is kept up-to-date too.
+ * The main repository (JDK 8 based) uses JDK 8 features significantly that unavailable in JDK 7.
*/
package org.jsr166;
@@ -22,7 +26,8 @@ import java.util.*;
* than shared {@code Random} objects in concurrent programs will
* typically encounter much less overhead and contention. Use of
* {@code ThreadLocalRandom} is particularly appropriate when multiple
- * tasks use random numbers in parallel in thread pools.
+ * tasks (for example, each a {@link ForkJoinTask}) use random numbers
+ * in parallel in thread pools.
*
* <p>Usages of this class should typically be of the form:
* {@code ThreadLocalRandom.current().nextX(...)} (where
@@ -38,7 +43,7 @@ import java.util.*;
*/
@SuppressWarnings("ALL")
public class ThreadLocalRandom8 extends Random {
- // same constants as Random, but must be re-declared because private
+ // same constants as Random, but must be redeclared because private
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;
@@ -112,9 +117,9 @@ public class ThreadLocalRandom8 extends Random {
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
+ * @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
- * @return the next value
*/
public int nextInt(int least, int bound) {
if (least >= bound)
@@ -177,7 +182,7 @@ public class ThreadLocalRandom8 extends Random {
* @throws IllegalArgumentException if n is not positive
*/
public double nextDouble(double n) {
- if (n <= 0)
+ if (!(n > 0))
throw new IllegalArgumentException("n must be positive");
return nextDouble() * n;
}
@@ -199,4 +204,4 @@ public class ThreadLocalRandom8 extends Random {
}
private static final long serialVersionUID = -5851777807851030925L;
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/jsr166/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/package-info.java b/modules/core/src/main/java/org/jsr166/package-info.java
index 135297c..6e839bb 100644
--- a/modules/core/src/main/java/org/jsr166/package-info.java
+++ b/modules/core/src/main/java/org/jsr166/package-info.java
@@ -5,6 +5,16 @@
*/
/**
- * Classes that were originally introduced in JSR166.
+ * Package contains classes that from JSR166.
+ *
+ * The files were imported from the following repositories:
+ * - http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/
+ * - http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jdk7/
+ *
+ * To keep the imported files up-to-date each of them (except ConcurrentLinkedHashMap) contains a reference to
+ * a corresponding CVS commit.
+ *
+ * For more information please refer to the community page:
+ * http://gee.cs.oswego.edu/dl/concurrency-interest/
*/
package org.jsr166;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/GridSuppressedExceptionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/GridSuppressedExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/GridSuppressedExceptionSelfTest.java
index c41bdaf..4c27d50 100644
--- a/modules/core/src/test/java/org/apache/ignite/GridSuppressedExceptionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/GridSuppressedExceptionSelfTest.java
@@ -93,7 +93,9 @@ public class GridSuppressedExceptionSelfTest extends TestCase {
*
* @throws Exception If failed.
*/
- public void _testStackTrace() throws Exception {
+ public void testStackTrace() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-818");
+
IgniteCheckedException me = new IgniteCheckedException("Test message.");
for (int i = 5; i < 20; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
new file mode 100644
index 0000000..467349f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.fair.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODE_CNT = 4;
+
+ /** */
+ private static final String CACHE1 = "cache1";
+
+ /** */
+ private static final String CACHE2 = "cache2";
+
+ /** */
+ private static final String CACHE3 = "cache3";
+
+ /** */
+ private static final String CACHE4 = "cache4";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ if (gridName.equals(getTestGridName(NODE_CNT - 1)))
+ cfg.setClientMode(true);
+
+ CacheConfiguration ccfg1 = new CacheConfiguration();
+
+ ccfg1.setBackups(1);
+ ccfg1.setName(CACHE1);
+ ccfg1.setAffinity(new RendezvousAffinityFunction());
+ ccfg1.setNodeFilter(new TestNodesFilter());
+
+ CacheConfiguration ccfg2 = new CacheConfiguration();
+
+ ccfg2.setBackups(1);
+ ccfg2.setName(CACHE2);
+ ccfg2.setAffinity(new RendezvousAffinityFunction());
+
+ CacheConfiguration ccfg3 = new CacheConfiguration();
+
+ ccfg3.setBackups(1);
+ ccfg3.setName(CACHE3);
+ ccfg3.setAffinity(new FairAffinityFunction());
+ ccfg3.setNodeFilter(new TestNodesFilter());
+
+ CacheConfiguration ccfg4 = new CacheConfiguration();
+
+ ccfg4.setCacheMode(REPLICATED);
+ ccfg4.setName(CACHE4);
+ ccfg4.setNodeFilter(new TestNodesFilter());
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODE_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeNotInAffinity() throws Exception {
+ checkCache(CACHE1, 2);
+
+ checkCache(CACHE2, 2);
+
+ checkCache(CACHE3, 2);
+
+ checkCache(CACHE4, 3);
+
+ Ignite client = ignite(NODE_CNT - 1);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setBackups(0);
+
+ ccfg.setNodeFilter(new TestNodesFilter());
+
+ try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg)) {
+ checkCache(null, 1);
+ }
+
+ try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg, new NearCacheConfiguration())) {
+ checkCache(null, 1);
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param expNodes Expected number of nodes per partition.
+ */
+ private void checkCache(String cacheName, int expNodes) {
+ log.info("Test cache: " + cacheName);
+
+ Ignite client = ignite(NODE_CNT - 1);
+
+ assertTrue(client.configuration().isClientMode());
+
+ ClusterNode clientNode = client.cluster().localNode();
+
+ for (int i = 0; i < NODE_CNT; i++) {
+ Ignite ignite = ignite(i);
+
+ Affinity<Integer> aff = ignite.affinity(cacheName);
+
+ for (int part = 0; part < aff.partitions(); part++) {
+ Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part);
+
+ assertEquals(expNodes, nodes.size());
+
+ assertFalse(nodes.contains(clientNode));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestNodesFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ Boolean attr = clusterNode.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
+
+ assertNotNull(attr);
+
+ assertFalse(attr);
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
new file mode 100644
index 0000000..adac0b2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.io.*;
+import java.sql.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Tests for store session listeners.
+ */
+public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ protected static final String URL = "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1";
+
+ /** */
+ protected static final AtomicInteger loadCacheCnt = new AtomicInteger();
+
+ /** */
+ protected static final AtomicInteger loadCnt = new AtomicInteger();
+
+ /** */
+ protected static final AtomicInteger writeCnt = new AtomicInteger();
+
+ /** */
+ protected static final AtomicInteger deleteCnt = new AtomicInteger();
+
+ /** */
+ protected static final AtomicInteger reuseCnt = new AtomicInteger();
+
+ /** */
+ protected static final AtomicBoolean write = new AtomicBoolean();
+
+ /** */
+ protected static final AtomicBoolean fail = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(3);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table1");
+ conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table2");
+
+ conn.createStatement().executeUpdate("CREATE TABLE Table1 (id INT AUTO_INCREMENT, key INT, value INT)");
+ conn.createStatement().executeUpdate("CREATE TABLE Table2 (id INT AUTO_INCREMENT, key INT, value INT)");
+ }
+
+ loadCacheCnt.set(0);
+ loadCnt.set(0);
+ writeCnt.set(0);
+ deleteCnt.set(0);
+ reuseCnt.set(0);
+
+ write.set(false);
+ fail.set(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicCache() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.ATOMIC);
+
+ try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
+ cache.loadCache(null);
+ cache.get(1);
+ cache.put(1, 1);
+ cache.remove(1);
+ }
+
+ assertEquals(3, loadCacheCnt.get());
+ assertEquals(1, loadCnt.get());
+ assertEquals(1, writeCnt.get());
+ assertEquals(1, deleteCnt.get());
+ assertEquals(0, reuseCnt.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransactionalCache() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
+
+ try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
+ cache.loadCache(null);
+ cache.get(1);
+ cache.put(1, 1);
+ cache.remove(1);
+ }
+
+ assertEquals(3, loadCacheCnt.get());
+ assertEquals(1, loadCnt.get());
+ assertEquals(1, writeCnt.get());
+ assertEquals(1, deleteCnt.get());
+ assertEquals(0, reuseCnt.get());
+
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExplicitTransaction() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
+
+ try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache.put(1, 1);
+ cache.put(2, 2);
+ cache.remove(3);
+ cache.remove(4);
+
+ tx.commit();
+ }
+ }
+
+ assertEquals(2, writeCnt.get());
+ assertEquals(2, deleteCnt.get());
+ assertEquals(3, reuseCnt.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTransaction() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+ CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+ try (
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+ ) {
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+ cache1.remove(3);
+ cache2.remove(4);
+
+ tx.commit();
+ }
+ }
+
+ assertEquals(2, writeCnt.get());
+ assertEquals(2, deleteCnt.get());
+ assertEquals(3, reuseCnt.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCommit() throws Exception {
+ write.set(true);
+
+ CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+ CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+ try (
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+ ) {
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+
+ tx.commit();
+ }
+ }
+
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ checkTable(conn, 1, false);
+ checkTable(conn, 2, false);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollback() throws Exception {
+ write.set(true);
+ fail.set(true);
+
+ CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
+ CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
+
+ try (
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
+ ) {
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+
+ tx.commit();
+
+ assert false : "Exception was not thrown.";
+ }
+ catch (IgniteException e) {
+ CacheWriterException we = X.cause(e, CacheWriterException.class);
+
+ assertNotNull(we);
+
+ assertEquals("Expected failure.", we.getMessage());
+ }
+ }
+
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ checkTable(conn, 1, true);
+ checkTable(conn, 2, true);
+ }
+ }
+
+ /**
+ * @param conn Connection.
+ * @param idx Table index.
+ * @param empty If table expected to be empty.
+ * @throws Exception In case of error.
+ */
+ private void checkTable(Connection conn, int idx, boolean empty) throws Exception {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT key, value FROM Table" + idx);
+
+ int cnt = 0;
+
+ while (rs.next()) {
+ int key = rs.getInt(1);
+ int val = rs.getInt(2);
+
+ assertEquals(idx, key);
+ assertEquals(idx, val);
+
+ cnt++;
+ }
+
+ assertEquals(empty ? 0 : 1, cnt);
+ }
+
+ /**
+ * @param name Cache name.
+ * @param atomicity Atomicity mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(String name, CacheAtomicityMode atomicity) {
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setName(name);
+ cfg.setAtomicityMode(atomicity);
+ cfg.setCacheStoreFactory(storeFactory());
+ cfg.setCacheStoreSessionListenerFactories(sessionListenerFactory());
+ cfg.setReadThrough(true);
+ cfg.setWriteThrough(true);
+ cfg.setLoadPreviousValue(true);
+
+ return cfg;
+ }
+
+ /**
+ * @return Store factory.
+ */
+ protected abstract Factory<? extends CacheStore<Integer, Integer>> storeFactory();
+
+ /**
+ * @return Session listener factory.
+ */
+ protected abstract Factory<CacheStoreSessionListener> sessionListenerFactory();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java
new file mode 100644
index 0000000..814c8a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Store session listeners test.
+ */
+public class CacheStoreSessionListenerLifecycleSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final Queue<String> evts = new ConcurrentLinkedDeque<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheStoreSessionListenerFactories(
+ new SessionListenerFactory("Shared 1"),
+ new SessionListenerFactory("Shared 2")
+ );
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ evts.clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoCaches() throws Exception {
+ try {
+ startGrid();
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList("Shared 1 START", "Shared 2 START", "Shared 1 STOP", "Shared 2 STOP"),
+ evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ CacheConfiguration<Integer, Integer> cacheCfg = cacheConfiguration("cache-" + i);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+
+ // Put to cache-0.
+ "Shared 1 SESSION START cache-0",
+ "Shared 2 SESSION START cache-0",
+ "Shared 1 SESSION END cache-0",
+ "Shared 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ // Transaction.
+ "Shared 1 SESSION START cache-0",
+ "Shared 2 SESSION START cache-0",
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-0",
+ "Shared 2 SESSION END cache-0",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartialOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ String name = "cache-" + i;
+
+ CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ if (i == 0) {
+ cacheCfg.setCacheStoreSessionListenerFactories(
+ new SessionListenerFactory(name + " 1"),
+ new SessionListenerFactory(name + " 2")
+ );
+ }
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+ "cache-0 1 START",
+ "cache-0 2 START",
+
+ // Put to cache-0.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ // Transaction.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ "cache-0 1 STOP",
+ "cache-0 2 STOP",
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ String name = "cache-" + i;
+
+ CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+ cacheCfg.setCacheStoreSessionListenerFactories(new SessionListenerFactory(name + " 1"), new SessionListenerFactory(name + " 2"));
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+ "cache-0 1 START",
+ "cache-0 2 START",
+ "cache-1 1 START",
+ "cache-1 2 START",
+
+ // Put to cache-0.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "cache-1 1 SESSION START cache-1",
+ "cache-1 2 SESSION START cache-1",
+ "cache-1 1 SESSION END cache-1",
+ "cache-1 2 SESSION END cache-1",
+
+ // Transaction.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-1 1 SESSION START cache-1",
+ "cache-1 2 SESSION START cache-1",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+ "cache-1 1 SESSION END cache-1",
+ "cache-1 2 SESSION END cache-1",
+
+ "cache-0 1 STOP",
+ "cache-0 2 STOP",
+ "cache-1 1 STOP",
+ "cache-1 2 STOP",
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(String name) {
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(name);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(Store.class));
+ cacheCfg.setWriteThrough(true);
+
+ return cacheCfg;
+ }
+
+ /**
+ */
+ private static class SessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** */
+ private final String name;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ * @param name Name.
+ */
+ private SessionListener(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ assertNotNull(ignite);
+
+ evts.add(name + " START");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ assertNotNull(ignite);
+
+ evts.add(name + " STOP");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ assertNotNull(ignite);
+
+ evts.add(name + " SESSION START " + ses.cacheName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ assertNotNull(ignite);
+
+ evts.add(name + " SESSION END " + ses.cacheName());
+ }
+ }
+
+ /**
+ */
+ private static class SessionListenerFactory implements Factory<CacheStoreSessionListener> {
+ /** */
+ private String name;
+
+ /**
+ * @param name Name.
+ */
+ private SessionListenerFactory(String name) {
+ this.name = name;
+ }
+
+ @Override public CacheStoreSessionListener create() {
+ return new SessionListener(name);
+ }
+ }
+
+ /**
+ */
+ public static class Store extends CacheStoreAdapter<Integer, Integer> {
+ public Store() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
+ throws CacheWriterException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}