You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2016/12/29 09:37:30 UTC
[26/50] [abbrv] ignite git commit: Merge with master - WIP.
Merge with master - WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30633571
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30633571
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30633571
Branch: refs/heads/ignite-3477
Commit: 30633571f878f794c5ba73beb9ecfd9c9f52aa7e
Parents: 9dc652d
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Thu Dec 22 17:28:04 2016 +0300
Committer: Ilya Lantukh <il...@gridgain.com>
Committed: Thu Dec 22 17:28:04 2016 +0300
----------------------------------------------------------------------
.../internal/client/GridClientClusterState.java | 33 +++++++++
.../router/impl/GridRouterClientImpl.java | 2 +-
.../GridCachePartitionExchangeManager.java | 4 +-
.../dht/GridClientPartitionTopology.java | 70 ++++++++++----------
.../dht/GridDhtPartitionTopology.java | 18 +++--
.../dht/GridDhtPartitionTopologyImpl.java | 54 ++++++++++++---
.../GridNearAtomicAbstractUpdateRequest.java | 2 -
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../IgniteCacheFullApiSelfTestSuite.java | 2 -
9 files changed, 127 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
new file mode 100644
index 0000000..4fa25ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+/**
+ * Interface for manage state of grid cluster.
+ */
+public interface GridClientClusterState {
+ /**
+ * @param active {@code True} activate, {@code False} deactivate.
+ */
+ public void active(boolean active) throws GridClientException;
+
+ /**
+ * @return {@code Boolean} - Current cluster state. {@code True} active, {@code False} inactive.
+ */
+ public boolean active() throws GridClientException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
index 1dd366b..12e2cc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
@@ -22,11 +22,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientClosedException;
+import org.apache.ignite.internal.client.GridClientClusterState;
import org.apache.ignite.internal.client.GridClientCompute;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientData;
import org.apache.ignite.internal.client.GridClientException;
-import org.apache.ignite.internal.client.GridClientClusterState;
import org.apache.ignite.internal.client.GridClientNode;
import org.apache.ignite.internal.client.GridClientPredicate;
import org.apache.ignite.internal.client.GridClientProtocol;
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6a9ea1b..c63da75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1243,7 +1243,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue(), null);
+ updated |= top.update(null, entry.getValue(), null) != null;
}
if (!cctx.kernalContext().clientNode() && updated)
@@ -1292,7 +1292,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null) {
- updated |= top.update(null, entry.getValue(), null, true);
+ updated |= top.update(null, entry.getValue(), null) != null;
cctx.affinity().checkRebalanceState(top, cacheId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 33de577..ca71f51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -550,7 +550,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
Map<Integer, T2<Long, Long>> cntrMap) {
if (log.isDebugEnabled())
@@ -564,7 +564,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return false;
+ return null;
}
if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -572,7 +572,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
- return false;
+ return null;
}
updateSeq.incrementAndGet();
@@ -635,7 +635,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
- return false;
+ return null;
}
finally {
lock.writeLock().unlock();
@@ -643,10 +643,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
+ @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
- Map<Integer, T2<Long, Long>> cntrMap,
- boolean checkEvictions) {
+ Map<Integer, T2<Long, Long>> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -655,21 +655,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
- return false;
+ return null;
}
lock.writeLock().lock();
try {
if (stopping)
- return false;
+ return null;
if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return false;
+ return null;
}
if (exchId != null)
@@ -686,45 +686,43 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
- return false;
+ return null;
}
long updateSeq = this.updateSeq.incrementAndGet();
- node2part.updateSequence(updateSeq);
+ node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
- boolean changed = cur == null || !cur.equals(parts);
+ boolean changed = false;
- if (changed) {
- node2part.put(parts.nodeId(), parts);
+ if (cur == null || !cur.equals(parts))
+ changed = true;
- // Add new mappings.
- for (Integer p : parts.keySet()) {
- Set<UUID> ids = part2node.get(p);
+ node2part.put(parts.nodeId(), parts);
- if (ids == null)
- // Initialize HashSet to size 3 in anticipation that there won't be
- // more than 3 nodes per partition.
- part2node.put(p, ids = U.newHashSet(3));
+ part2node = new HashMap<>(part2node);
- ids.add(parts.nodeId());
- }
+ // Add new mappings.
+ for (Integer p : parts.keySet()) {
+ Set<UUID> ids = part2node.get(p);
+
+ if (ids == null)
+ // Initialize HashSet to size 3 in anticipation that there won't be
+ // more than 3 nodes per partition.
+ part2node.put(p, ids = U.newHashSet(3));
- // Remove obsolete mappings.
- if (cur != null) {
- for (Integer p : cur.keySet()) {
- if (parts.containsKey(p))
- continue;
+ changed |= ids.add(parts.nodeId());
+ }
- Set<UUID> ids = part2node.get(p);
+ // Remove obsolete mappings.
+ if (cur != null) {
+ for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
+ Set<UUID> ids = part2node.get(p);
- if (ids != null)
- ids.remove(parts.nodeId());
- }
+ if (ids != null)
+ changed |= ids.remove(parts.nodeId());
}
}
- else
- cur.updateSequence(parts.updateSequence(), parts.topologyVersion());
if (cntrMap != null) {
for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
@@ -740,7 +738,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
- return changed;
+ return changed ? localPartitionMap() : null;
}
finally {
lock.writeLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 5918da8..ac3e2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -216,9 +216,9 @@ public interface GridDhtPartitionTopology {
* @param exchId Exchange ID.
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
- * @return {@code True} if topology state changed.
+ * @return Local partition map if there were evictions or {@code null} otherwise.
*/
- public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, T2<Long, Long>> cntrMap);
@@ -226,13 +226,11 @@ public interface GridDhtPartitionTopology {
* @param exchId Exchange ID.
* @param parts Partitions.
* @param cntrMap Partition update counters.
- * @param checkEvictions Check evictions flag.
- * @return {@code True} if topology state changed.
+ * @return Local partition map if there were evictions or {@code null} otherwise.
*/
- @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
- @Nullable Map<Integer, T2<Long, Long>> cntrMap,
- boolean checkEvictions);
+ @Nullable Map<Integer, T2<Long, Long>> cntrMap);
/**
* Checks if there is at least one owner for each partition in the cache topology.
@@ -279,6 +277,12 @@ public interface GridDhtPartitionTopology {
public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
/**
+ * @param nodeId Node to get partitions for.
+ * @return Partitions for node.
+ */
+ @Nullable public GridDhtPartitionMap2 partitions(UUID nodeId);
+
+ /**
* Prints memory stats.
*
* @param threshold Threshold for number of entries.
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ea3b7c6..f22c263 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1429,7 +1429,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost();
if (marked)
- updateLocal(locPart.id(), cctx.localNodeId(), locPart.state(), updSeq);
+ updateLocal(locPart.id(), locPart.state(), updSeq);
changed |= marked;
}
@@ -1494,7 +1494,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
boolean marked = locPart.own();
if (marked)
- updateLocal(locPart.id(), cctx.localNodeId(), locPart.state(), updSeq);
+ updateLocal(locPart.id(), locPart.state(), updSeq);
}
for (UUID nodeId : nodeIds) {
@@ -1582,6 +1582,42 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
* @param updateSeq Update sequence.
+ * @return {@code True} if state changed.
+ */
+ private boolean checkEvictions(long updateSeq) {
+ AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+
+ boolean changed = false;
+
+ if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
+ List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+
+ changed = checkEvictions(updateSeq, aff);
+
+ updateRebalanceVersion(aff);
+ }
+
+ return changed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkEvictions() {
+ lock.writeLock().lock();
+
+ try {
+ long updateSeq = this.updateSeq.incrementAndGet();
+
+ node2part.newUpdateSequence(updateSeq);
+
+ checkEvictions(updateSeq);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @param updateSeq Update sequence.
* @param aff Affinity assignments.
* @return Checks if any of the local partitions need to be evicted.
*/
@@ -1824,15 +1860,15 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/** {@inheritDoc} */
@Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) {
- lock.readLock().lock();
+ lock.readLock().lock();
- try {
- return node2part.get(nodeId);
- }
- finally {
- lock.readLock().unlock();
- }
+ try {
+ return node2part.get(nodeId);
}
+ finally {
+ lock.readLock().unlock();
+ }
+ }
/** {@inheritDoc} */
@Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index d5f8b64..b933186 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -133,8 +133,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
*/
public abstract boolean hasPrimary();
- public abstract boolean recovery();
-
/**
* @param res Response.
* @return {@code True} if current response was {@code null}.
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5d90e35..5bb7536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1746,7 +1746,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (cacheCtx != null)
cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
else {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
index 17757ab..7d4fee1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
@@ -57,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAto
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderFairAffinityMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeP2PDisabledFullApiSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapTieredFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWrityOrderOffHeapMultiNodeFullApiSelfTest;
@@ -231,7 +230,6 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedFullApiMultithreadedSelfTest.class);
// Disabled striped pool.
- suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.class);
suite.addTestSuite(GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.class);
// Other.