You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/26 14:44:03 UTC
ignite git commit: finalizing changes
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-balance-master abb95fac5 -> 1b88631d1
finalizing changes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b88631d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b88631d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b88631d
Branch: refs/heads/ignite-comm-balance-master
Commit: 1b88631d1288d3969f4e6b821d3ff58f41a835ef
Parents: abb95fa
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Jan 26 17:43:52 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Jan 26 17:43:52 2017 +0300
----------------------------------------------------------------------
.../client/util/GridClientConsistentHash.java | 14 +-
.../ignite/internal/client/util/MpscQueue.java | 289 -------------------
.../managers/communication/GridIoManager.java | 7 +
.../discovery/GridDiscoveryManager.java | 45 ++-
.../affinity/GridAffinityAssignmentCache.java | 8 +-
.../cache/CacheAffinitySharedManager.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 18 --
.../processors/cache/GridCacheIoManager.java | 7 +
.../processors/cache/IgniteCacheProxy.java | 8 -
.../dht/GridClientPartitionTopology.java | 1 -
.../dht/GridDhtAssignmentFetchFuture.java | 7 +-
.../dht/GridDhtPartitionTopologyImpl.java | 3 -
.../dht/atomic/GridDhtAtomicCache.java | 15 +-
.../GridNearAtomicSingleUpdateFuture.java | 77 +++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 78 +++--
.../cache/transactions/IgniteTxHandler.java | 4 +-
.../clock/GridClockSyncProcessor.java | 52 ++--
.../ignite/internal/util/StripedExecutor.java | 42 +--
.../discovery/GridDiscoveryManagerSelfTest.java | 116 --------
19 files changed, 151 insertions(+), 642 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
index 8134906..0c9a3fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
@@ -439,13 +439,9 @@ public class GridClientConsistentHash<N> {
/** {@inheritDoc} */
@Override public String toString() {
- StringBuilder sb = new StringBuilder(getClass().getSimpleName());
-
- sb.append(" [affSeed=").append(affSeed).
- append(", circle=").append(circle).
- append(", nodesComp=").append(nodesComp).
- append(", nodes=").append(nodes).append("]");
-
- return sb.toString();
+ return getClass().getSimpleName() + " [affSeed=" + affSeed +
+ ", circle=" + circle +
+ ", nodesComp=" + nodesComp +
+ ", nodes=" + nodes + "]";
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
deleted file mode 100644
index 8821f66..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.client.util;
-
-import java.util.AbstractQueue;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.util.typedef.internal.A;
-
-import static java.util.concurrent.locks.LockSupport.park;
-import static java.util.concurrent.locks.LockSupport.unpark;
-
-/**
- * Multi producer single consumer queue.
- */
-public class MpscQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
- static final int INITIAL_ARRAY_SIZE = 512;
- static final Node BLOCKED = new Node();
-
- final AtomicReference<Node> putStack = new AtomicReference<Node>();
- private final AtomicInteger takeStackSize = new AtomicInteger();
-
- private Thread consumerThread;
- private Object[] takeStack = new Object[INITIAL_ARRAY_SIZE];
- private int takeStackIndex = -1;
-
- static int nextPowerOfTwo(final int value) {
- return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
- }
-
- public void setConsumerThread(Thread consumerThread) {
- this.consumerThread = consumerThread;
- }
-
- /**
- * {@inheritDoc}.
- *
- * This call is threadsafe; but it will only remove the items that are on the put-stack.
- */
- @Override
- public void clear() {
- putStack.set(BLOCKED);
- }
-
- @Override
- public boolean offer(E item) {
- A.notNull(item, "item can't be null");
-
- AtomicReference<Node> putStack = this.putStack;
- Node newHead = new Node();
- newHead.item = item;
-
- for (; ; ) {
- Node oldHead = putStack.get();
- if (oldHead == null || oldHead == BLOCKED) {
- newHead.next = null;
- newHead.size = 1;
- } else {
- newHead.next = oldHead;
- newHead.size = oldHead.size + 1;
- }
-
- if (!putStack.compareAndSet(oldHead, newHead)) {
- continue;
- }
-
- if (oldHead == BLOCKED) {
- unpark(consumerThread);
- }
-
- return true;
- }
- }
-
- @Override
- public E peek() {
- E item = peekNext();
- if (item != null) {
- return item;
- }
- if (!drainPutStack()) {
- return null;
- }
- return peekNext();
- }
-
- @Override
- public E take() throws InterruptedException {
- E item = next();
- if (item != null) {
- return item;
- }
-
- takeAll();
- assert takeStackIndex == 0;
- assert takeStack[takeStackIndex] != null;
-
- return next();
- }
-
- @Override
- public E poll() {
- E item = next();
-
- if (item != null) {
- return item;
- }
-
- if (!drainPutStack()) {
- return null;
- }
-
- return next();
- }
-
- private E next() {
- E item = peekNext();
- if (item != null) {
- dequeue();
- }
- return item;
- }
-
- private E peekNext() {
- if (takeStackIndex == -1) {
- return null;
- }
-
- if (takeStackIndex == takeStack.length) {
- takeStackIndex = -1;
- return null;
- }
-
- E item = (E) takeStack[takeStackIndex];
- if (item == null) {
- takeStackIndex = -1;
- return null;
- }
- return item;
- }
-
- private void dequeue() {
- takeStack[takeStackIndex] = null;
- takeStackIndex++;
- takeStackSize.lazySet(takeStackSize.get() - 1);
- }
-
- private void takeAll() throws InterruptedException {
- AtomicReference<Node> putStack = this.putStack;
- for (; ; ) {
- if (consumerThread.isInterrupted()) {
- putStack.compareAndSet(BLOCKED, null);
- throw new InterruptedException();
- }
-
- Node currentPutStackHead = putStack.get();
-
- if (currentPutStackHead == null) {
- // there is nothing to be take, so lets block.
- if (!putStack.compareAndSet(null, BLOCKED)) {
- // we are lucky, something is available
- continue;
- }
-
- // lets block for real.
- park();
- } else if (currentPutStackHead == BLOCKED) {
- park();
- } else {
- if (!putStack.compareAndSet(currentPutStackHead, null)) {
- continue;
- }
-
- copyIntoTakeStack(currentPutStackHead);
- break;
- }
- }
- }
-
- private boolean drainPutStack() {
- for (; ; ) {
- Node head = putStack.get();
- if (head == null) {
- return false;
- }
-
- if (putStack.compareAndSet(head, null)) {
- copyIntoTakeStack(head);
- return true;
- }
- }
- }
-
- private void copyIntoTakeStack(Node putStackHead) {
- int putStackSize = putStackHead.size;
-
- takeStackSize.lazySet(putStackSize);
-
- if (putStackSize > takeStack.length) {
- takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
- }
-
- for (int i = putStackSize - 1; i >= 0; i--) {
- takeStack[i] = putStackHead.item;
- putStackHead = putStackHead.next;
- }
-
- takeStackIndex = 0;
- assert takeStack[0] != null;
- }
-
- /**
- * {@inheritDoc}.
- *
- * Best effort implementation.
- */
- @Override
- public int size() {
- Node h = putStack.get();
- int putStackSize = h == null ? 0 : h.size;
- return putStackSize + takeStackSize.get();
- }
-
- @Override
- public boolean isEmpty() {
- return size() == 0;
- }
-
- @Override
- public void put(E e) throws InterruptedException {
- offer(e);
- }
-
- @Override
- public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
- add(e);
- return true;
- }
-
- @Override
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int remainingCapacity() {
- return Integer.MAX_VALUE;
- }
-
- @Override
- public int drainTo(Collection<? super E> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int drainTo(Collection<? super E> c, int maxElements) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<E> iterator() {
- throw new UnsupportedOperationException();
- }
-
- private static final class Node<E> {
- Node next;
- E item;
- int size;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 56cefdf..d38b8f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -630,6 +630,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert nodeId != null;
assert msg != null;
+ Lock busyLock0 = busyLock.readLock();
+
+ busyLock0.lock();
+
try {
if (stopping) {
if (log.isDebugEnabled())
@@ -716,6 +720,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (will ignore): " + msg, e);
}
+ finally {
+ busyLock0.unlock();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9ecd78e..a436d4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -1573,7 +1574,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).allNodes();
+ return resolveDiscoCache(CU.cacheId(null), topVer).allNodes();
}
/**
@@ -1581,7 +1582,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return All server nodes for given topology version.
*/
public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).srvNodes;
+ return resolveDiscoCache(CU.cacheId(null), topVer).srvNodes;
}
/**
@@ -1592,7 +1593,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Node.
*/
public ClusterNode node(AffinityTopologyVersion topVer, UUID id) {
- return resolveDiscoCache(topVer).node(id);
+ return resolveDiscoCache(CU.cacheId(null), topVer).node(id);
}
/**
@@ -1603,7 +1604,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).cacheNodes(cacheName, topVer.topologyVersion());
+ return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName, topVer.topologyVersion());
}
/**
@@ -1614,7 +1615,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).cacheNodes(cacheId, topVer.topologyVersion());
+ return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId, topVer.topologyVersion()); // TODO
}
/**
@@ -1624,7 +1625,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).allNodesWithCaches(topVer.topologyVersion());
+ return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches(topVer.topologyVersion());
}
/**
@@ -1634,7 +1635,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).remoteCacheNodes(topVer.topologyVersion());
+ return resolveDiscoCache(CU.cacheId(null), topVer).remoteCacheNodes(topVer.topologyVersion());
}
/**
@@ -1642,7 +1643,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Oldest alive server nodes with at least one cache configured.
*/
@Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
- DiscoCache cache = resolveDiscoCache(topVer);
+ DiscoCache cache = resolveDiscoCache(CU.cacheId(null), topVer);
Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
@@ -1657,7 +1658,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache affinity nodes.
*/
public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).cacheAffinityNodes(CU.cacheId(cacheName), topVer.topologyVersion());
+ int cacheId = CU.cacheId(cacheName);
+
+ return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
}
/**
@@ -1668,7 +1671,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache affinity nodes.
*/
public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
+ return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
}
/**
@@ -1738,17 +1741,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Checks if cache with given name has at least one node with near cache enabled.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return {@code True} if cache with given name has at least one node with near cache enabled.
- */
- public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).hasNearCache(CU.cacheId(cacheName));
- }
-
- /**
* Checks if cache with given ID has at least one node with near cache enabled.
*
* @param cacheId Cache ID.
@@ -1756,23 +1748,28 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if cache with given name has at least one node with near cache enabled.
*/
public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(topVer).hasNearCache(cacheId);
+ return resolveDiscoCache(cacheId, topVer).hasNearCache(cacheId);
}
/**
* Gets discovery cache for given topology version.
*
+ * @param cacheId Cache ID (participates in exception message).
* @param topVer Topology version.
* @return Discovery cache.
*/
- private DiscoCache resolveDiscoCache(AffinityTopologyVersion topVer) {
+ private DiscoCache resolveDiscoCache(int cacheId, AffinityTopologyVersion topVer) {
Snapshot snap = topSnap.get();
DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ?
snap.discoCache : discoCacheHist.get(topVer);
if (cache == null) {
- throw new IgniteException("Failed to resolve nodes topology [topVer=" + topVer +
+ DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
+
+ throw new IgniteException("Failed to resolve nodes topology [" +
+ "cacheName=" + (desc != null ? desc.cacheConfiguration().getName() : "N/A") +
+ ", topVer=" + topVer +
", history=" + discoCacheHist.keySet() +
", snap=" + snap +
", locNode=" + ctx.discovery().localNode() + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..144b162 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -63,7 +63,7 @@ public class GridAffinityAssignmentCache {
private final String cacheName;
/** */
- private final Integer cacheId;
+ private final int cacheId;
/** Number of backups. */
private final int backups;
@@ -169,7 +169,7 @@ public class GridAffinityAssignmentCache {
/**
* @return Cache ID.
*/
- public Integer cacheId() {
+ public int cacheId() {
return cacheId;
}
@@ -266,7 +266,7 @@ public class GridAffinityAssignmentCache {
List<ClusterNode> sorted;
if (!locCache) {
- sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer));
+ sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheId(), topVer));
Collections.sort(sorted, GridNodeOrderComparator.INSTANCE);
}
@@ -617,4 +617,4 @@ public class GridAffinityAssignmentCache {
return S.toString(AffinityReadyFuture.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2890887..7bf5fd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -843,7 +843,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
return true;
// If local node did not initiate exchange or local node is the only cache node in grid.
- Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheName(), fut.topologyVersion());
+ Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion());
DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 68a8d1c..e414160 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -288,9 +288,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Asynchronous operations limit semaphore. */
private Semaphore asyncOpsSem;
- /** */
- protected volatile boolean asyncToggled;
-
/** {@inheritDoc} */
@Override public String name() {
return cacheCfg.getName();
@@ -367,18 +364,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * Toggles async flag if someone calls {@code withAsync()}
- * on proxy and since that we have to properly handle all cache
- * operations (sync and async) to put them in proper sequence.
- *
- * TODO: https://issues.apache.org/jira/browse/IGNITE-4393
- */
- void toggleAsync() {
- if (!asyncToggled)
- asyncToggled = true;
- }
-
- /**
* Prints memory stats.
*/
public void printMemoryStats() {
@@ -4461,9 +4446,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Failed future if waiting was interrupted.
*/
@Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
- if (!asyncToggled)
- return null;
-
try {
if (asyncOpsSem != null)
asyncOpsSem.acquire();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index bdaf3a0..d20310b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -348,6 +349,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage> c) {
+ Lock lock = rw.readLock();
+
+ lock.lock();
+
try {
if (stopping) {
if (log.isDebugEnabled())
@@ -376,6 +381,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
finally {
if (depEnabled)
cctx.deploy().ignoreOwnership(false);
+
+ lock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 3e157db..b0e25c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -334,14 +334,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteCache<K, V> withAsync() {
- if (delegate instanceof GridCacheAdapter)
- ((GridCacheAdapter)delegate).toggleAsync();
-
- return super.withAsync();
- }
-
- /** {@inheritDoc} */
@Override public IgniteCache<K, V> withSkipStore() {
return skipStore();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 3f7fd0d..fe5be0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -217,7 +217,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
- // TODO
assert topReadyFut != null;
return topReadyFut;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..a79e024 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -79,9 +79,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
AffinityTopologyVersion topVer
) {
this.ctx = ctx;
- this.key = new T2<>(CU.cacheId(cacheName), topVer);
+ int cacheId = CU.cacheId(cacheName);
+ this.key = new T2<>(cacheId, topVer);
- Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer);
+ Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheId, topVer);
LinkedList<ClusterNode> tmp = new LinkedList<>();
@@ -220,4 +221,4 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
@Override public String toString() {
return S.toString(GridDhtAssignmentFetchFuture.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index b130ed9..9f4a079 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -825,9 +825,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
List<ClusterNode> affNodes = affAssignment.get(p);
- if (CU.cheatCache(cctx.cacheId()))
- return affNodes;
-
lock.readLock().lock();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ba0ea89..4809637 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1797,8 +1797,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtPartitionTopology top = topology();
- if (!CU.cheatCache(ctx.cacheId()))
- top.readLock();
+ top.readLock();
try {
if (top.stopping()) {
@@ -1913,8 +1912,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
remap = true;
}
finally {
- if (!CU.cheatCache(ctx.cacheId()))
- top.readUnlock();
+ top.readUnlock();
}
}
catch (GridCacheEntryRemovedException e) {
@@ -2915,9 +2913,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = entryExx(key, topVer);
- if (CU.cheatCache(ctx.cacheId())) // TODO
- return Collections.singletonList(entry);
-
GridUnsafe.monitorEnter(entry);
if (entry.obsolete())
@@ -2993,9 +2988,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param topVer Topology version.
*/
private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) {
- if (CU.cheatCache(ctx.cacheId()))
- return;
-
// Process deleted entries before locks release.
assert ctx.deferredDelete() : this;
@@ -3197,7 +3189,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+ msgLog.debug("Received near atomic update response " +
+ "[futId=" + res.futureVersion() + ", node=" + nodeId + ']');
res.nodeId(ctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index da9cb40..133e42e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -382,60 +382,53 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
AffinityTopologyVersion topVer;
GridCacheVersion futVer;
- if (!CU.cheatCache(cctx.cacheId())) {
- cache.topology().readLock();
+ cache.topology().readLock();
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- return;
- }
+ return;
+ }
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- if (err != null) {
- onDone(err);
+ if (err != null) {
+ onDone(err);
- return;
- }
+ return;
+ }
- topVer = fut.topologyVersion();
+ topVer = fut.topologyVersion();
- futVer = addAtomicFuture(topVer);
+ futVer = addAtomicFuture(topVer);
+ }
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
}
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ else
+ onDone(new GridCacheTryPutFailedException());
- return;
- }
- }
- finally {
- cache.topology().readUnlock();
+ return;
}
}
- else {
- topVer = cache.topology().topologyVersionFuture().topologyVersion();
-
- futVer = addAtomicFuture(topVer);
+ finally {
+ cache.topology().readUnlock();
}
if (futVer != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9f6e761..da33fda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -491,60 +491,53 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
AffinityTopologyVersion topVer;
GridCacheVersion futVer;
- if (!CU.cheatCache(cctx.cacheId())) {
- cache.topology().readLock();
+ cache.topology().readLock();
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- return;
- }
+ return;
+ }
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- if (err != null) {
- onDone(err);
+ if (err != null) {
+ onDone(err);
- return;
- }
+ return;
+ }
- topVer = fut.topologyVersion();
+ topVer = fut.topologyVersion();
- futVer = addAtomicFuture(topVer);
+ futVer = addAtomicFuture(topVer);
+ }
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
}
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ else
+ onDone(new GridCacheTryPutFailedException());
- return;
- }
- }
- finally {
- cache.topology().readUnlock();
+ return;
}
}
- else {
- topVer = cache.topology().topologyVersionFuture().topologyVersion();
-
- futVer = addAtomicFuture(topVer);
+ finally {
+ cache.topology().readUnlock();
}
if (futVer != null)
@@ -634,7 +627,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
updVer = this.updVer;
if (updVer == null) {
- //updVer = cctx.versions().next(topVer);
updVer = futVer;
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index eaf1c87..5df7c40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -514,8 +514,8 @@ public class IgniteTxHandler {
for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
GridCacheContext ctx = e.context();
- Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
- Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+ Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
+ Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 36178f3..257d0d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -273,33 +273,31 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
* @return Adjusted time.
*/
public long adjustedTime(long topVer) {
-// T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
-//
-// GridClockDeltaSnapshot snap;
-//
-// if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
-// snap = fastSnap.get2();
-// else {
-// // Get last synchronized time on given topology version.
-// Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
-// new GridClockDeltaVersion(0, topVer + 1));
-//
-// snap = entry == null ? null : entry.getValue();
-// }
-//
-// long now = clockSrc.currentTimeMillis();
-//
-// if (snap == null)
-// return now;
-//
-// Long delta = snap.deltas().get(ctx.localNodeId());
-//
-// if (delta == null)
-// delta = 0L;
-//
-// return now + delta;
-
- return System.currentTimeMillis();
+ T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
+
+ GridClockDeltaSnapshot snap;
+
+ if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
+ snap = fastSnap.get2();
+ else {
+ // Get last synchronized time on given topology version.
+ Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
+ new GridClockDeltaVersion(0, topVer + 1));
+
+ snap = entry == null ? null : entry.getValue();
+ }
+
+ long now = clockSrc.currentTimeMillis();
+
+ if (snap == null)
+ return now;
+
+ Long delta = snap.deltas().get(ctx.localNodeId());
+
+ if (delta == null)
+ delta = 0L;
+
+ return now + delta;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 39dde09..a653429 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.client.util.MpscQueue;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -75,7 +74,7 @@ public class StripedExecutor implements ExecutorService {
try {
for (int i = 0; i < cnt; i++) {
- stripes[i] = new StripeMPSC(
+ stripes[i] = new StripeConcurrentQueue(
gridName,
poolName,
i,
@@ -476,45 +475,6 @@ public class StripedExecutor implements ExecutorService {
}
}
- private static class StripeMPSC extends Stripe {
- private final MpscQueue<Runnable> q = new MpscQueue<>();
-
- public StripeMPSC(
- String gridName,
- String poolName,
- int idx,
- IgniteLogger log
- ) {
- super(
- gridName,
- poolName,
- idx,
- log);
- }
-
- @Override void start() {
- super.start();
-
- q.setConsumerThread(thread);
- }
-
- @Override void execute(Runnable cmd) {
- q.offer(cmd);
- }
-
- @Override Runnable take() throws InterruptedException {
- return q.take();
- }
-
- @Override int queueSize() {
- return q.size();
- }
-
- @Override String queueToString() {
- return q.toString();
- }
- }
-
/**
* Stripe.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
index 230a3bc..5601254 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
@@ -17,15 +17,8 @@
package org.apache.ignite.internal.managers.discovery;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -83,115 +76,6 @@ public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTes
}
/**
- * @throws Exception If failed.
- */
- public void testHasNearCache() throws Exception {
- IgniteKernal g0 = (IgniteKernal)startGrid(0); // PARTITIONED_ONLY cache.
-
- AffinityTopologyVersion none = new AffinityTopologyVersion(-1);
- AffinityTopologyVersion one = new AffinityTopologyVersion(1);
- AffinityTopologyVersion two = new AffinityTopologyVersion(2, 2);
- AffinityTopologyVersion three = new AffinityTopologyVersion(3);
- AffinityTopologyVersion four = new AffinityTopologyVersion(4);
- AffinityTopologyVersion five = new AffinityTopologyVersion(5);
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, none));
- assertFalse(g0.context().discovery().hasNearCache(null, none));
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
-
- IgniteKernal g1 = (IgniteKernal)startGrid(1); // NEAR_ONLY cache.
-
- grid(1).createNearCache(null, new NearCacheConfiguration());
-
- grid(1).createNearCache(CACHE_NAME, new NearCacheConfiguration());
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g1.context().discovery().hasNearCache(null, two));
-
- IgniteKernal g2 = (IgniteKernal)startGrid(2); // PARTITIONED_ONLY cache.
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g1.context().discovery().hasNearCache(null, two));
- assertTrue(g1.context().discovery().hasNearCache(null, three));
-
- assertTrue(g2.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g2.context().discovery().hasNearCache(null, three));
-
- stopGrid(2);
-
- // Wait all nodes are on version 4.
- for (;;) {
- if (F.forAll(
- Ignition.allGrids(),
- new IgnitePredicate<Ignite>() {
- @Override public boolean apply(Ignite ignite) {
- return ignite.cluster().topologyVersion() == 4;
- }
- }))
- break;
-
- Thread.sleep(1000);
- }
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
- assertTrue(g0.context().discovery().hasNearCache(null, four));
-
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, four));
- assertTrue(g1.context().discovery().hasNearCache(null, three));
- assertTrue(g1.context().discovery().hasNearCache(null, four));
-
- stopGrid(1);
-
- // Wait all nodes are on version 5.
- for (;;) {
- if (F.forAll(
- Ignition.allGrids(),
- new IgnitePredicate<Ignite>() {
- @Override public boolean apply(Ignite ignite) {
- return ignite.cluster().topologyVersion() == 5;
- }
- }))
- break;
-
- Thread.sleep(1000);
- }
-
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
- assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
- assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, five));
-
- assertFalse(g0.context().discovery().hasNearCache(null, one));
- assertTrue(g0.context().discovery().hasNearCache(null, two));
- assertTrue(g0.context().discovery().hasNearCache(null, three));
- assertTrue(g0.context().discovery().hasNearCache(null, four));
- assertFalse(g0.context().discovery().hasNearCache(null, five));
- }
-
- /**
*
*/
public static class RegularDiscovery extends GridDiscoveryManagerSelfTest {