You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/28 18:21:34 UTC
[3/4] ignite git commit: Ignite-1093 Improved rebalancing
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..4fe2153
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+
+/**
+ * Thread pool for requesting partitions from other nodes and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Preload predicate. */
+ private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+ /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+ @GridToStringInclude
+ private volatile SyncFuture syncFut;
+
+ /** Last timeout object. */
+ private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+ /** Last exchange future. */
+ private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+ /** Demand lock. */
+ private final ReadWriteLock demandLock;
+
+ /**
+ * @param cctx Cctx.
+ * @param demandLock Demand lock.
+ */
+ public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+ assert cctx != null;
+
+ this.cctx = cctx;
+ this.demandLock = demandLock;
+
+ log = cctx.logger(getClass());
+
+ boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+ syncFut = new SyncFuture(null);
+
+ if (!enabled)
+ // Calling onDone() immediately since preloading is disabled.
+ syncFut.onDone();
+ }
+
+ /**
+ *
+ */
+ void start() {
+ }
+
+ /**
+ *
+ */
+ void stop() {
+ lastExchangeFut = null;
+
+ lastTimeoutObj.set(null);
+ }
+
+ /**
+ * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+ */
+ IgniteInternalFuture<?> syncFuture() {
+ return syncFut;
+ }
+
+ /**
+ * Sets preload predicate for demand pool.
+ *
+ * @param preloadPred Preload predicate.
+ */
+ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+ this.preloadPred = preloadPred;
+ }
+
+ /**
+ * Force preload.
+ */
+ void forcePreload() {
+ GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+ if (obj != null)
+ cctx.time().removeTimeoutObject(obj);
+
+ final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+ if (exchFut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing rebalance event for future: " + exchFut);
+
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.shared().exchange().forcePreloadExchange(exchFut);
+ }
+ });
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring force rebalance request (no topology event happened yet).");
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return {@code True} if topology changed.
+ */
+ private boolean topologyChanged(AffinityTopologyVersion topVer) {
+ return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+ }
+
+ /**
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+ preloadEvent(-1, type, discoEvt);
+ }
+
+ /**
+ * @param part Partition.
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+ assert discoEvt != null;
+
+ cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ }
+
+ /**
+ * @param assigns Assignments.
+ * @param force {@code True} if dummy reassign.
+ * @throws IgniteCheckedException
+ */
+
+ void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Adding partition assignments: " + assigns);
+
+ long delay = cctx.config().getRebalanceDelay();
+
+ if (delay == 0 || force) {
+ assert assigns != null;
+
+ final AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+ SyncFuture fut = syncFut;
+
+ if (fut.isInited()) {
+ if (!fut.isDone())
+ fut.onCancel();
+
+ fut = new SyncFuture(assigns);
+
+ syncFut = fut;
+ }
+ else
+ fut.init(assigns);
+
+ if (assigns.isEmpty()) {
+ fut.onDone();
+
+ return;
+ }
+
+ if (topologyChanged(topVer)) {
+ fut.onCancel();
+
+ return;
+ }
+
+ final SyncFuture cSF = fut;
+
+ new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
+ @Override public void run() {
+ if (!CU.isMarshallerCache(cctx.name())) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+
+ try {
+ IgniteInternalFuture fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
+
+ if (!topologyChanged(topVer))
+ fut.get();
+ else {
+ cSF.onCancel();
+
+ return;
+ }
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ']');
+ cSF.onCancel();
+
+ return;
+ }
+ }
+ catch (IgniteCheckedException e) {
+ cSF.onCancel();
+
+ throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+ }
+ }
+
+ int rebalanceOrder = cctx.config().getRebalanceOrder();
+
+ if (rebalanceOrder > 0) {
+ IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+
+ try {
+ if (fut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+ ", rebalanceOrder=" + rebalanceOrder + ']');
+
+ if (!topologyChanged(topVer))
+ fut.get();
+ else {
+ cSF.onCancel();
+
+ return;
+ }
+ }
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+ cSF.onCancel();
+
+ return;
+ }
+ }
+ catch (IgniteCheckedException e) {
+ cSF.onCancel();
+
+ throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
+ }
+ }
+
+ requestPartitions(cSF);
+ }
+ }).start();
+
+ }
+ else if (delay > 0) {
+ GridTimeoutObject obj = lastTimeoutObj.get();
+
+ if (obj != null)
+ cctx.time().removeTimeoutObject(obj);
+
+ final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+ assert exchFut != null : "Delaying rebalance process without topology event.";
+
+ obj = new GridTimeoutObjectAdapter(delay) {
+ @Override public void onTimeout() {
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+ cctx.shared().exchange().forcePreloadExchange(exchFut);
+ }
+ });
+ }
+ };
+
+ lastTimeoutObj.set(obj);
+
+ cctx.time().addTimeoutObject(obj);
+ }
+ }
+
+ /**
+ * @param fut Future.
+ */
+ private void requestPartitions(SyncFuture fut) {
+ final GridDhtPreloaderAssignments assigns = fut.assigns;
+
+ AffinityTopologyVersion topVer = fut.topologyVersion();
+
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+ if (topologyChanged(topVer)) {
+ fut.onCancel();
+
+ return;
+ }
+
+ final ClusterNode node = e.getKey();
+
+ GridDhtPartitionDemandMessage d = e.getValue();
+
+ d.timeout(cctx.config().getRebalanceTimeout());
+ d.workerId(0);//old api support.
+
+ final CacheConfiguration cfg = cctx.config();
+
+ final long start = U.currentTimeMillis();
+
+ fut.logStart(node.id(), start);
+
+ U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+
+ //Check remote node rebalancing API version.
+ if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
+ GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
+
+ remainings.addAll(d.partitions());
+
+ fut.append(node.id(), remainings);
+
+ int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+
+ List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+ for (int cnt = 0; cnt < lsnrCnt; cnt++)
+ sParts.add(new HashSet<Integer>());
+
+ Iterator<Integer> it = d.partitions().iterator();
+
+ int cnt = 0;
+
+ while (it.hasNext())
+ sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+ for (cnt = 0; cnt < lsnrCnt; cnt++) {
+
+ if (!sParts.get(cnt).isEmpty()) {
+
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+ initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+
+ try {
+ if (!topologyChanged(topVer))
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+ else
+ fut.onCancel();
+ }
+ catch (IgniteCheckedException ex) {
+ fut.onCancel();
+
+ U.error(log, "Failed to send partition demand message to node", ex);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
+ }
+ }
+ }
+ else {
+ DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+ fut.append(node.id(), d.partitions());
+
+ dw.run(node, d);
+ }
+ }
+ }
+
+ /**
+ * @param c Partitions.
+ * @return String representation of partitions list.
+ */
+ private String partitionsList(Collection<Integer> c) {
+ LinkedList<Integer> s = new LinkedList<>(c);
+
+ Collections.sort(s);
+
+ StringBuilder sb = new StringBuilder();
+
+ int start = -1;
+
+ int prev = -1;
+
+ Iterator<Integer> sit = s.iterator();
+
+ while (sit.hasNext()) {
+ int p = sit.next();
+ if (start == -1) {
+ start = p;
+ prev = p;
+ }
+
+ if (prev < p - 1) {
+ sb.append(start);
+
+ if (start != prev)
+ sb.append("-").append(prev);
+
+ sb.append(", ");
+
+ start = p;
+ }
+
+ if (!sit.hasNext()) {
+ sb.append(start);
+
+ if (start != p)
+ sb.append("-").append(p);
+ }
+
+ prev = p;
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * @param idx Index.
+ * @param id Node id.
+ * @param supply Supply.
+ */
+ public void handleSupplyMessage(
+ int idx,
+ final UUID id,
+ final GridDhtPartitionSupplyMessageV2 supply) {
+ AffinityTopologyVersion topVer = supply.topologyVersion();
+
+ final SyncFuture fut = syncFut;
+
+ if (topologyChanged(topVer)) {
+ fut.onCancel();
+
+ return;
+ }
+
+ ClusterNode node = cctx.node(id);
+
+ assert node != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + supply);
+
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during preloading: " + supply.classError());
+
+ fut.onCancel(id);
+
+ return;
+ }
+
+ final GridDhtPartitionTopology top = cctx.dht().topology();
+
+ try {
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+ if (topologyChanged(topVer)) {
+ fut.onCancel();
+
+ return;
+ }
+
+ int p = e.getKey();
+
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
+
+ assert reserved : "Failed to reserve partition [gridName=" +
+ cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+ part.lock();
+
+ try {
+ // Loop through all received entries and try to preload them.
+ for (GridCacheEntryInfo entry : e.getValue().infos()) {
+ if (!part.preloadingPermitted(entry.key(), entry.version())) {
+ if (log.isDebugEnabled())
+ log.debug("Preloading is not permitted for entry due to " +
+ "evictions [key=" + entry.key() +
+ ", ver=" + entry.version() + ']');
+
+ continue;
+ }
+ if (!preloadEntry(node, p, entry, topVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+ break;
+ }
+ }
+
+ boolean last = supply.last().contains(p);
+
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ top.own(part);
+
+ fut.onPartitionDone(id, p);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " + part);
+ }
+ }
+ finally {
+ part.unlock();
+ part.release();
+ }
+ }
+ else {
+ fut.onPartitionDone(id, p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+ }
+ }
+ else {
+ fut.onPartitionDone(id, p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+ }
+ }
+
+ // Only request partitions based on latest topology version.
+ for (Integer miss : supply.missed())
+ if (cctx.affinity().localNode(miss, topVer))
+ fut.onMissedPartition(id, miss);
+
+ for (Integer miss : supply.missed())
+ fut.onPartitionDone(id, miss);
+
+ if (!fut.isDone()) {
+ GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+
+ if (d != null) {
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+
+ nextD.topic(GridCachePartitionExchangeManager.demanderTopic(idx));
+
+ if (!topologyChanged(topVer)) {
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(idx),
+ nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ }
+ else
+ fut.onCancel();
+ }
+ }
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during rebalancing [node=" + node.id() +
+ ", msg=" + e.getMessage() + ']');
+ fut.onCancel();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
+
+ fut.onCancel(node.id());
+ }
+ }
+
+ /**
+ * @param pick Node picked for preloading.
+ * @param p Partition.
+ * @param entry Preloaded entry.
+ * @param topVer Topology version.
+ * @return {@code False} if partition has become invalid during preloading.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private boolean preloadEntry(
+ ClusterNode pick,
+ int p,
+ GridCacheEntryInfo entry,
+ AffinityTopologyVersion topVer
+ ) throws IgniteCheckedException {
+ try {
+ GridCacheEntryEx cached = null;
+
+ try {
+ cached = cctx.dht().entryEx(entry.key());
+
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+ if (cctx.dht().isIgfsDataCache() &&
+ cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+ LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+ "value, will ignore rebalance entries)");
+
+ if (cached.markObsoleteIfEmpty(null))
+ cached.context().cache().removeIfObsolete(cached.key());
+
+ return true;
+ }
+
+ if (preloadPred == null || preloadPred.apply(entry)) {
+ if (cached.initialValue(
+ entry.value(),
+ entry.version(),
+ entry.ttl(),
+ entry.expireTime(),
+ true,
+ topVer,
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+ )) {
+ cctx.evicts().touch(cached, topVer); // Start tracking.
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+ cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+ (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+ false, null, null, null);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+ ", part=" + p + ']');
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+ cached.key() + ", part=" + p + ']');
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+ return false;
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+ cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionDemander.class, this);
+ }
+
+ /**
+ * Sets last exchange future.
+ *
+ * @param lastFut Last future to set.
+ */
+ void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+ lastExchangeFut = lastFut;
+ }
+
+ /**
+ *
+ */
+ public class SyncFuture extends GridFutureAdapter<Boolean> {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ /** Remaining. */
+ private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
+
+ /** Missed. */
+ private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
+
+ /** Started. */
+ private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+
+ /** Lock. */
+ private Lock lock = new ReentrantLock();
+
+ /** Listener. */
+ private volatile GridLocalEventListener lsnr;
+
+ /** Assignments. */
+ private volatile GridDhtPreloaderAssignments assigns;
+
+ /** Completed. */
+ private volatile boolean completed = true;
+
+ /**
+ * @param assigns Assigns.
+ */
+ SyncFuture(GridDhtPreloaderAssignments assigns) {
+ this.assigns = assigns;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return assigns != null ? assigns.topologyVersion() : null;
+ }
+
+ /**
+ * @param assigns Assigns.
+ */
+ void init(GridDhtPreloaderAssignments assigns) {
+ final SyncFuture fut = this;
+
+ lsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ fut.onCancel();
+ }
+ };
+
+ cctx.events().addListener(lsnr, EVT_NODE_FAILED);
+
+ this.assigns = assigns;
+ }
+
+ /**
+ * @return Initialised or not.
+ */
+ boolean isInited() {
+ return assigns != null;
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param parts Parts.
+ */
+ void append(UUID nodeId, Collection<Integer> parts) {
+ remaining.put(nodeId, parts);
+
+ missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param time Time.
+ */
+ void logStart(UUID nodeId, long time) {
+ started.put(nodeId, time);
+ }
+
+ /**
+ * @param node Node.
+ */
+ GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
+ if (isDone())
+ return null;
+
+ return assigns.get(node);
+ }
+
+ /**
+ *
+ */
+ void onCancel() {
+ lock.lock();
+ try {
+ if (isDone())
+ return;
+
+ remaining.clear();
+
+ completed = false;
+
+ U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name()
+ + ", topology=" + topologyVersion() +
+ ", time=" +
+ (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]");
+
+ checkIsDone();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param nodeId Node id.
+ */
+ void onCancel(UUID nodeId) {
+ lock.lock();
+ try {
+ if (isDone())
+ return;
+
+ remaining.remove(nodeId);
+
+ completed = false;
+
+ U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+ ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+
+ checkIsDone();
+ }
+ finally {
+ lock.unlock();
+ }
+
+ }
+
+ /**
+ * @return Is completed.
+ */
+ boolean isCompleted() {
+ return completed;
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param p P.
+ */
+ void onMissedPartition(UUID nodeId, int p) {
+ lock.lock();
+ try {
+ if (isDone())
+ return;
+
+ if (missed.get(nodeId) == null)
+ missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+
+ missed.get(nodeId).add(p);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param p P.
+ */
+ void onPartitionDone(UUID nodeId, int p) {
+ lock.lock();
+ try {
+ if (isDone())
+ return;
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ assigns.exchangeFuture().discoveryEvent());
+
+ Collection<Integer> parts = remaining.get(nodeId);
+
+ if (parts != null) {
+ parts.remove(p);
+
+ if (parts.isEmpty()) {
+ remaining.remove(nodeId);
+
+ U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+ ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+ }
+ }
+
+ checkIsDone();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ *
+ */
+ private void checkIsDone() {
+ if (remaining.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Completed sync future.");
+
+ Collection<Integer> m = new HashSet<>();
+
+ for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+ if (e.getValue() != null && !e.getValue().isEmpty())
+ m.addAll(e.getValue());
+ }
+
+ if (!m.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Reassigning partitions that were missed: " + m);
+
+ cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+ }
+
+ cctx.shared().exchange().scheduleResendPartitions();
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
+
+ if (lsnr != null)
+ cctx.events().removeListener(lsnr);
+
+ onDone(completed);
+
+ missed.clear();
+ remaining.clear();
+ started.clear();
+ assigns.clear();
+ }
+ }
+ }
+
+ /**
+ * Supply message wrapper.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private static class SupplyMessage {
+ /** Sender ID. */
+ private UUID sndId;
+
+ /** Supply message. */
+ private GridDhtPartitionSupplyMessage supply;
+
+ /**
+ * Dummy constructor.
+ */
+ private SupplyMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param sndId Sender ID.
+ * @param supply Supply message.
+ */
+ SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+ this.sndId = sndId;
+ this.supply = supply;
+ }
+
+ /**
+ * @return Sender ID.
+ */
+ UUID senderId() {
+ return sndId;
+ }
+
+ /**
+ * @return Message.
+ */
+ GridDhtPartitionSupplyMessage supply() {
+ return supply;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SupplyMessage.class, this);
+ }
+ }
+
+ /** DemandWorker index. */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private final AtomicInteger dmIdx = new AtomicInteger();
+
+ /**
+ *
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private class DemandWorker {
+ /** Worker ID. */
+ private int id;
+
+ /** Partition-to-node assignments. */
+ private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+ /** Message queue. */
+ private final LinkedBlockingDeque<SupplyMessage> msgQ =
+ new LinkedBlockingDeque<>();
+
+ /** Counter. */
+ private long cntr;
+
+ /** Hide worker logger and use cache logger instead. */
+ private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+ private volatile SyncFuture fut;
+
+ /**
+ * @param id Worker ID.
+ */
+ private DemandWorker(int id, SyncFuture fut) {
+ assert id >= 0;
+
+ this.id = id;
+ this.fut = fut;
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void addMessage(SupplyMessage msg) {
+ msgQ.offer(msg);
+ }
+
+ /**
+ * @param deque Deque to poll from.
+ * @param time Time to wait.
+ * @return Polled item.
+ * @throws InterruptedException If interrupted.
+ */
+ @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+ return deque.poll(time, MILLISECONDS);
+ }
+
+ /**
+ * @param idx Unique index for this topic.
+ * @return Topic for partition.
+ */
+ public Object topic(long idx) {
+ return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+ }
+
+ /**
+ * @param node Node to demand from.
+ * @param topVer Topology version.
+ * @param d Demand message.
+ * @param exchFut Exchange future.
+ * @return Missed partitions.
+ * @throws InterruptedException If interrupted.
+ * @throws ClusterTopologyCheckedException If node left.
+ * @throws IgniteCheckedException If failed to send message.
+ */
+ private Set<Integer> demandFromNode(
+ ClusterNode node,
+ final AffinityTopologyVersion topVer,
+ GridDhtPartitionDemandMessage d,
+ GridDhtPartitionsExchangeFuture exchFut
+ ) throws InterruptedException, IgniteCheckedException {
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ cntr++;
+
+ d.topic(topic(cntr));
+ d.workerId(id);
+
+ Set<Integer> missed = new HashSet<>();
+
+ // Get the same collection that will be sent in the message.
+ Collection<Integer> remaining = d.partitions();
+
+ if (topologyChanged(topVer))
+ return missed;
+
+ cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+ addMessage(new SupplyMessage(nodeId, msg));
+ }
+ });
+
+ try {
+ boolean retry;
+
+ // DoWhile.
+ // =======
+ do {
+ retry = false;
+
+ // Create copy.
+ d = new GridDhtPartitionDemandMessage(d, remaining);
+
+ long timeout = cctx.config().getRebalanceTimeout();
+
+ d.timeout(timeout);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+ // Send demand message.
+ cctx.io().send(node, d, cctx.ioPolicy());
+
+ // While.
+ // =====
+ while (!topologyChanged(topVer)) {
+ SupplyMessage s = poll(msgQ, timeout);
+
+ // If timed out.
+ if (s == null) {
+ if (msgQ.isEmpty()) { // Safety check.
+ U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+ " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+ " configuration properties).");
+
+ // Ordered listener was removed if timeout expired.
+ cctx.io().removeOrderedHandler(d.topic());
+
+ // Must create copy to be able to work with IO manager thread local caches.
+ d = new GridDhtPartitionDemandMessage(d, remaining);
+
+ // Create new topic.
+ d.topic(topic(++cntr));
+
+ // Create new ordered listener.
+ cctx.io().addOrderedHandler(d.topic(),
+ new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId,
+ GridDhtPartitionSupplyMessage msg) {
+ addMessage(new SupplyMessage(nodeId, msg));
+ }
+ });
+
+ // Resend message with larger timeout.
+ retry = true;
+
+ break; // While.
+ }
+ else
+ continue; // While.
+ }
+
+ // Check that message was received from expected node.
+ if (!s.senderId().equals(node.id())) {
+ U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+ ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+ continue; // While.
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + s);
+
+ GridDhtPartitionSupplyMessage supply = s.supply();
+
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during preloading: " + supply.classError());
+
+ retry = true;
+
+ // Quit preloading.
+ break;
+ }
+
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+ int p = e.getKey();
+
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
+
+ assert reserved : "Failed to reserve partition [gridName=" +
+ cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+ part.lock();
+
+ try {
+ Collection<Integer> invalidParts = new GridLeanSet<>();
+
+ // Loop through all received entries and try to preload them.
+ for (GridCacheEntryInfo entry : e.getValue().infos()) {
+ if (!invalidParts.contains(p)) {
+ if (!part.preloadingPermitted(entry.key(), entry.version())) {
+ if (log.isDebugEnabled())
+ log.debug("Preloading is not permitted for entry due to " +
+ "evictions [key=" + entry.key() +
+ ", ver=" + entry.version() + ']');
+
+ continue;
+ }
+
+ if (!preloadEntry(node, p, entry, topVer)) {
+ invalidParts.add(p);
+
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+ }
+ }
+ }
+
+ boolean last = supply.last().contains(p);
+
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ remaining.remove(p);
+ fut.onPartitionDone(node.id(), p);
+
+ top.own(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " + part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ exchFut.discoveryEvent());
+ }
+ }
+ finally {
+ part.unlock();
+ part.release();
+ }
+ }
+ else {
+ remaining.remove(p);
+ fut.onPartitionDone(node.id(), p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+ }
+ }
+ else {
+ remaining.remove(p);
+ fut.onPartitionDone(node.id(), p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+ }
+ }
+
+ remaining.removeAll(s.supply().missed());
+
+ // Only request partitions based on latest topology version.
+ for (Integer miss : s.supply().missed()) {
+ if (cctx.affinity().localNode(miss, topVer))
+ missed.add(miss);
+
+ fut.onMissedPartition(node.id(), miss);
+ }
+
+ if (remaining.isEmpty())
+ break; // While.
+
+ if (s.supply().ack()) {
+ retry = true;
+
+ break;
+ }
+ }
+ }
+ while (retry && !topologyChanged(topVer));
+
+ return missed;
+ }
+ finally {
+ cctx.io().removeOrderedHandler(d.topic());
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param d D.
+ */
+ public void run(ClusterNode node, GridDhtPartitionDemandMessage d) {
+ demandLock.readLock().lock();
+
+ try {
+ GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+
+ AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+
+ Collection<Integer> missed = new HashSet<>();
+
+ if (topologyChanged(topVer)) {
+ fut.onCancel();
+
+ return;
+ }
+
+ try {
+ Set<Integer> set = demandFromNode(node, topVer, d, exchFut);
+
+ if (!set.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
+ set + ']');
+
+ missed.addAll(set);
+ }
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+ ", msg=" + e.getMessage() + ']');
+
+ fut.onCancel();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+
+ fut.onCancel(node.id());
+ }
+ catch (InterruptedException e) {
+ fut.onCancel();
+ }
+ }
+ finally {
+ demandLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..0686376
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jsr166.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private GridDhtPartitionTopology top;
+
+ /** */
+ private final boolean depEnabled;
+
+ /** Preload predicate. */
+ private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+ /** Supply context map. */
+ private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+ /** Done map. */
+ private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();
+
+ /**
+ * @param cctx Cache context.
+ */
+ GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
+ assert cctx != null;
+
+ this.cctx = cctx;
+
+ log = cctx.logger(getClass());
+
+ top = cctx.dht().topology();
+
+ depEnabled = cctx.gridDeploy().enabled();
+ }
+
+ /**
+ *
+ */
+ void start() {
+ startOldListeners();
+ }
+
+ /**
+ *
+ */
+ void stop() {
+ top = null;
+ }
+
+ /**
+ * Sets preload predicate for supply pool.
+ *
+ * @param preloadPred Preload predicate.
+ */
+ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+ this.preloadPred = preloadPred;
+ }
+
+ /**
+ * @param d Demand message.
+ * @param id Node uuid.
+ */
+ public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
+ assert d != null;
+ assert id != null;
+
+ if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+ return;
+
+ GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+ d.updateSequence(), cctx.cacheId(), d.topologyVersion());
+
+ long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+ ClusterNode node = cctx.discovery().node(id);
+
+ T2<UUID, Object> scId = new T2<>(id, d.topic());
+
+ try {
+ if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
+ doneMap.remove(scId);
+ scMap.remove(scId);
+ }
+
+ SupplyContext sctx = scMap.remove(scId);
+
+ if (doneMap.get(scId) != null)
+ return;
+
+ long bCnt = 0;
+
+ int phase = 0;
+
+ boolean newReq = true;
+
+ long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
+
+ if (sctx != null) {
+ phase = sctx.phase;
+
+ maxBatchesCnt = 1;
+ }
+
+ Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+ while ((sctx != null && newReq) || partIt.hasNext()) {
+ int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+ newReq = false;
+
+ GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ if (phase == 0)
+ phase = 1;
+
+ if (phase == 1) {
+ Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+ (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+ while (entIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (!reply(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+ swapLsnr = null;
+
+ return;
+ }
+ else {
+ s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
+ }
+ }
+
+ GridCacheEntryEx e = entIt.next();
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ }
+
+ if (phase == 1)
+ phase = 2;
+
+ if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+ (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ try {
+ boolean prepared = false;
+
+ while (iter.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (!reply(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+
+ swapLsnr = null;
+
+ return;
+ }
+ else {
+ s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
+ }
+ }
+
+ Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ if (partMissing)
+ continue;
+ }
+ finally {
+ iter.close();
+ }
+ }
+ }
+
+ if (swapLsnr == null && sctx != null)
+ swapLsnr = sctx.swapLsnr;
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (phase == 2)
+ phase = 3;
+
+ if (phase == 3 && swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+ (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+ while (lsnrIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (!reply(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+ return;
+ }
+ else {
+ s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
+ }
+ }
+
+ GridCacheEntryInfo info = lsnrIt.next();
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+ phase = 0;
+
+ sctx = null;
+ }
+ finally {
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ reply(node, d, s);
+
+ doneMap.put(scId, true);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + id, e);
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d DemandMessage
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s)
+ throws IgniteCheckedException {
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ return false;
+ }
+ }
+
+ /**
+ * @param t Tuple.
+ * @param phase Phase.
+ * @param partIt Partition it.
+ * @param part Partition.
+ * @param entryIt Entry it.
+ * @param swapLsnr Swap listener.
+ */
+ private void saveSupplyContext(
+ T2 t,
+ int phase,
+ Iterator<Integer> partIt,
+ int part,
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
+ scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+ }
+
+ /**
+ * Supply context.
+ */
+ private static class SupplyContext {
+ /** Phase. */
+ private int phase;
+
+ /** Partition iterator. */
+ private Iterator<Integer> partIt;
+
+ /** Entry iterator. */
+ private Iterator<?> entryIt;
+
+ /** Swap listener. */
+ private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+ /** Partition. */
+ int part;
+
+ /**
+ * @param phase Phase.
+ * @param partIt Partition iterator.
+ * @param entryIt Entry iterator.
+ * @param swapLsnr Swap listener.
+ * @param part Partition.
+ */
+ public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+ GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+ this.phase = phase;
+ this.partIt = partIt;
+ this.entryIt = entryIt;
+ this.swapLsnr = swapLsnr;
+ this.part = part;
+ }
+ }
+
+ @Deprecated//Backward compatibility. To be removed in future.
+ public void startOldListeners() {
+ if (!cctx.kernalContext().clientNode()) {
+ int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+ cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ processOldDemandMessage(m, id);
+ }
+ });
+ }
+ }
+
+ /**
+ * @param d D.
+ * @param id Id.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+ GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
+
+ ClusterNode node = cctx.node(id);
+
+ long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+ boolean ack = false;
+
+ try {
+ for (int part : d.partitions()) {
+ GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ for (GridCacheEntryEx e : loc.entries()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
+ }
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ if (cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ try {
+ boolean prepared = false;
+
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
+ }
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ if (partMissing)
+ continue;
+ }
+ finally {
+ iter.close();
+ }
+ }
+ }
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ for (GridCacheEntryInfo info : entries) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(),
+ cctx.cacheId());
+ }
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+ if (ack) {
+ s.markAck();
+
+ break; // Partition for loop.
+ }
+ }
+ finally {
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ replyOld(node, d, s);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d Demand message.
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+ throws IgniteCheckedException {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ return false;
+ }
+ }
+}