You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/09 11:15:29 UTC
[35/50] [abbrv] ignite git commit: Ignite-1093 "Rebalancing with
default parameters is very slow" fixes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..865bad8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,1034 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.IgniteSpiException;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private GridDhtPartitionTopology top;
+
+ /** */
+ private final boolean depEnabled;
+
+ /** Preload predicate. */
+ private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+ /** Supply context map. T2: nodeId, idx, topVer. */
+ private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
+
+ /**
+ * @param cctx Cache context.
+ */
+ GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
+ assert cctx != null;
+
+ this.cctx = cctx;
+
+ log = cctx.logger(getClass());
+
+ top = cctx.dht().topology();
+
+ depEnabled = cctx.gridDeploy().enabled();
+ }
+
+ /**
+ *
+ */
+ void start() {
+ startOldListeners();
+ }
+
+ /**
+ *
+ */
+ void stop() {
+ synchronized (scMap) {
+ Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+ while (it.hasNext()) {
+ T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
+
+ clearContext(scMap.get(t), log);
+
+ it.remove();
+ }
+ }
+
+ stopOldListeners();
+ }
+
+ /**
+ * Clear context.
+ *
+ * @param sc Supply context.
+ * @param log Logger.
+ * @return true in case context was removed.
+ */
+ private static void clearContext(
+ final SupplyContext sc,
+ final IgniteLogger log) {
+ if (sc != null) {
+ final Iterator it = sc.entryIt;
+
+ if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
+ try {
+ ((GridCloseableIterator)it).close();
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Iterator close failed.", e);
+ }
+ }
+
+ final GridDhtLocalPartition loc = sc.loc;
+
+ if (loc != null) {
+ assert loc.reservations() > 0;
+
+ loc.release();
+ }
+ }
+ }
+
+ /**
+ * Handles new topology.
+ *
+ * @param topVer Topology version.
+ */
+ public void onTopologyChanged(AffinityTopologyVersion topVer) {
+ synchronized (scMap) {
+ Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+ while (it.hasNext()) {
+ T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
+
+ if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete contexts.
+ clearContext(scMap.get(t), log);
+
+ it.remove();
+
+ if (log.isDebugEnabled())
+ log.debug("Supply context removed [node=" + t.get1() + "]");
+ }
+ }
+ }
+ }
+
+ /**
+ * Sets preload predicate for supply pool.
+ *
+ * @param preloadPred Preload predicate.
+ */
+ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+ this.preloadPred = preloadPred;
+ }
+
+ /**
+ * @param d Demand message.
+ * @param idx Index.
+ * @param id Node uuid.
+ */
+ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+ assert d != null;
+ assert id != null;
+
+ AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion demTop = d.topologyVersion();
+
+ T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
+
+ if (d.updateSequence() == -1) {//Demand node requested context cleanup.
+ synchronized (scMap) {
+ clearContext(scMap.remove(scId), log);
+
+ return;
+ }
+ }
+
+ if (cutTop.compareTo(demTop) > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop +
+ ", from=" + id + ", idx=" + idx + "]");
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop +
+ ", from=" + id + ", idx=" + idx + "]");
+
+ GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
+ d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+
+ ClusterNode node = cctx.discovery().node(id);
+
+ if (node == null)
+ return; //Context will be cleaned at topology change.
+
+ try {
+ SupplyContext sctx;
+
+ synchronized (scMap) {
+ sctx = scMap.remove(scId);
+
+ assert sctx == null || d.updateSequence() == sctx.updateSeq;
+ }
+
+ // Initial demand request should contain partitions list.
+ if (sctx == null && d.partitions() == null)
+ return;
+
+ assert !(sctx != null && d.partitions() != null);
+
+ long bCnt = 0;
+
+ SupplyContextPhase phase = SupplyContextPhase.NEW;
+
+ boolean newReq = true;
+
+ long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
+
+ if (sctx != null) {
+ phase = sctx.phase;
+
+ maxBatchesCnt = 1;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+ ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+ ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+ ", idx=" + idx + "]");
+ }
+
+ Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+ while ((sctx != null && newReq) || partIt.hasNext()) {
+ int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+ newReq = false;
+
+ GridDhtLocalPartition loc;
+
+ if (sctx != null && sctx.loc != null) {
+ loc = sctx.loc;
+
+ assert loc.reservations() > 0;
+ }
+ else {
+ loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (phase == SupplyContextPhase.NEW && cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ if (phase == SupplyContextPhase.NEW)
+ phase = SupplyContextPhase.ONHEAP;
+
+ if (phase == SupplyContextPhase.ONHEAP) {
+ Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+ (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+ while (entIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ entIt,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+
+ swapLsnr = null;
+ loc = null;
+
+ reply(node, d, s, scId);
+
+ return;
+ }
+ else {
+ if (!reply(node, d, s, scId))
+ return;
+
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+ }
+ }
+
+ GridCacheEntryEx e = entIt.next();
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ }
+
+ if (phase == SupplyContextPhase.ONHEAP) {
+ phase = SupplyContextPhase.SWAP;
+
+ if (sctx != null) {
+ sctx = new SupplyContext(
+ phase,
+ partIt,
+ null,
+ swapLsnr,
+ part,
+ loc,
+ d.updateSequence());
+ }
+ }
+
+ if (phase == SupplyContextPhase.SWAP && cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ sctx != null && sctx.entryIt != null ?
+ (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ boolean prepared = false;
+
+ while (iter.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ iter,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+
+ swapLsnr = null;
+ loc = null;
+
+ reply(node, d, s, scId);
+
+ return;
+ }
+ else {
+ if (!reply(node, d, s, scId))
+ return;
+
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+ }
+ }
+
+ Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ iter.close();
+
+ if (partMissing)
+ continue;
+ }
+ }
+
+ if (swapLsnr == null && sctx != null)
+ swapLsnr = sctx.swapLsnr;
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (phase == SupplyContextPhase.SWAP) {
+ phase = SupplyContextPhase.EVICTED;
+
+ if (sctx != null) {
+ sctx = new SupplyContext(
+ phase,
+ partIt,
+ null,
+ null,
+ part,
+ loc,
+ d.updateSequence());
+ }
+ }
+
+ if (phase == SupplyContextPhase.EVICTED && swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ Iterator<GridCacheEntryInfo> lsnrIt = sctx != null && sctx.entryIt != null ?
+ (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+ while (lsnrIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ lsnrIt,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+
+ loc = null;
+
+ reply(node, d, s, scId);
+
+ return;
+ }
+ else {
+ if (!reply(node, d, s, scId))
+ return;
+
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+ }
+ }
+
+ GridCacheEntryInfo info = lsnrIt.next();
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+ phase = SupplyContextPhase.NEW;
+
+ sctx = null;
+ }
+ finally {
+ if (loc != null)
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ reply(node, d, s, scId);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+ ", fromNode=" + node.id() +
+ ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+ ", idx=" + idx + "]");
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + id, e);
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() +
+ ", msg=" + e.getMessage() + ']');
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d DemandMessage
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean reply(ClusterNode n,
+ GridDhtPartitionDemandMessage d,
+ GridDhtPartitionSupplyMessageV2 s,
+ T3<UUID, Integer, AffinityTopologyVersion> scId)
+ throws IgniteCheckedException {
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ // Throttle preloading.
+ if (cctx.config().getRebalanceThrottle() > 0)
+ U.sleep(cctx.config().getRebalanceThrottle());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ synchronized (scMap) {
+ clearContext(scMap.remove(scId), log);
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * @param t Tuple.
+ * @param phase Phase.
+ * @param partIt Partition it.
+ * @param part Partition.
+ * @param entryIt Entry it.
+ * @param swapLsnr Swap listener.
+ */
+ private void saveSupplyContext(
+ T3<UUID, Integer, AffinityTopologyVersion> t,
+ SupplyContextPhase phase,
+ Iterator<Integer> partIt,
+ int part,
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
+ GridDhtLocalPartition loc,
+ AffinityTopologyVersion topVer,
+ long updateSeq) {
+ synchronized (scMap) {
+ if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+ assert scMap.get(t) == null;
+
+ scMap.put(t,
+ new SupplyContext(phase,
+ partIt,
+ entryIt,
+ swapLsnr,
+ part,
+ loc,
+ updateSeq));
+ }
+ else if (loc != null) {
+ assert loc.reservations() > 0;
+
+ loc.release();
+ }
+ }
+ }
+
+ /**
+ * Supply context phase.
+ */
+ private enum SupplyContextPhase {
+ NEW,
+ ONHEAP,
+ SWAP,
+ EVICTED
+ }
+
+ /**
+ * Supply context.
+ */
+ private static class SupplyContext {
+ /** Phase. */
+ private final SupplyContextPhase phase;
+
+ /** Partition iterator. */
+ private final Iterator<Integer> partIt;
+
+ /** Entry iterator. */
+ private final Iterator<?> entryIt;
+
+ /** Swap listener. */
+ private final GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+ /** Partition. */
+ private final int part;
+
+ /** Local partition. */
+ private final GridDhtLocalPartition loc;
+
+ /** Update seq. */
+ private final long updateSeq;
+
+ /**
+ * @param phase Phase.
+ * @param partIt Partition iterator.
+ * @param entryIt Entry iterator.
+ * @param swapLsnr Swap listener.
+ * @param part Partition.
+ */
+ public SupplyContext(SupplyContextPhase phase,
+ Iterator<Integer> partIt,
+ Iterator<?> entryIt,
+ GridCacheEntryInfoCollectSwapListener swapLsnr,
+ int part,
+ GridDhtLocalPartition loc,
+ long updateSeq) {
+ this.phase = phase;
+ this.partIt = partIt;
+ this.entryIt = entryIt;
+ this.swapLsnr = swapLsnr;
+ this.part = part;
+ this.loc = loc;
+ this.updateSeq = updateSeq;
+ }
+ }
+
+ @Deprecated//Backward compatibility. To be removed in future.
+ public void startOldListeners() {
+ if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+ cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ processOldDemandMessage(m, id);
+ }
+ });
+ }
+ }
+
+ @Deprecated//Backward compatibility. To be removed in future.
+ public void stopOldListeners() {
+ if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+
+ cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class);
+ }
+ }
+
+ /**
+ * @param d D.
+ * @param id Id.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+ GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
+
+ ClusterNode node = cctx.node(id);
+
+ long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+ boolean ack = false;
+
+ try {
+ for (int part : d.partitions()) {
+ GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ for (GridCacheEntryEx e : loc.entries()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), cctx.deploymentEnabled());
+ }
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ if (cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ try {
+ boolean prepared = false;
+
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
+ }
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ if (partMissing)
+ continue;
+ }
+ finally {
+ iter.close();
+ }
+ }
+ }
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ for (GridCacheEntryInfo info : entries) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(),
+ cctx.cacheId(),
+ cctx.deploymentEnabled());
+ }
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+ if (ack) {
+ s.markAck();
+
+ break; // Partition for loop.
+ }
+ }
+ finally {
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ replyOld(node, d, s);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d Demand message.
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+ throws IgniteCheckedException {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ return false;
+ }
+ }
+
+ /**
+ * Dumps debug information.
+ */
+ public void dumpDebugInfo() {
+ synchronized (scMap) {
+ if (!scMap.isEmpty()) {
+ U.warn(log, "Rebalancing supplier reserved following partitions:");
+
+ for (SupplyContext sc : scMap.values()) {
+ if (sc.loc != null)
+ U.warn(log, ">>> " + sc.loc);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..41454f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Update sequence. */
+ private long updateSeq;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Partitions that have been fully sent. */
+ @GridDirectCollection(int.class)
+ private Collection<Integer> last;
+
+ /** Partitions which were not found. */
+ @GridToStringInclude
+ @GridDirectCollection(int.class)
+ private Collection<Integer> missed;
+
+ /** Entries. */
+ @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+ private Map<Integer, CacheEntryInfoCollection> infos;
+
+ /** Message size. */
+ @GridDirectTransient
+ private int msgSize;
+
+ /**
+ * @param updateSeq Update sequence for this node.
+ * @param cacheId Cache ID.
+ * @param addDepInfo Deployment info flag.
+ */
+ GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) {
+ this.cacheId = cacheId;
+ this.updateSeq = updateSeq;
+ this.topVer = topVer;
+ this.addDepInfo = addDepInfo; }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridDhtPartitionSupplyMessageV2() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ignoreClassErrors() {
+ return true;
+ }
+
+ /**
+ * @return Update sequence.
+ */
+ long updateSequence() {
+ return updateSeq;
+ }
+
+ /**
+ * @return Topology version for which demand message is sent.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Flag to indicate last message for partition.
+ */
+ Collection<Integer> last() {
+ return last == null ? Collections.<Integer>emptySet() : last;
+ }
+
+ /**
+ * @param p Partition which was fully sent.
+ */
+ void last(int p) {
+ if (last == null)
+ last = new HashSet<>();
+
+ if (last.add(p)) {
+ msgSize += 4;
+
+ // If partition is empty, we need to add it.
+ if (!infos().containsKey(p)) {
+ CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+ infoCol.init();
+
+ infos().put(p, infoCol);
+ }
+ }
+ }
+
+ /**
+ * @param p Missed partition.
+ */
+ void missed(int p) {
+ if (missed == null)
+ missed = new HashSet<>();
+
+ if (missed.add(p))
+ msgSize += 4;
+ }
+
+ /**
+ * @return Missed partitions.
+ */
+ Collection<Integer> missed() {
+ return missed == null ? Collections.<Integer>emptySet() : missed;
+ }
+
+ /**
+ * @return Entries.
+ */
+ Map<Integer, CacheEntryInfoCollection> infos() {
+ if (infos == null)
+ infos = new HashMap<>();
+
+ return infos;
+ }
+
+ /**
+ * @return Message size.
+ */
+ int messageSize() {
+ return msgSize;
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos().get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos().put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+ assert (info.key() != null || info.keyBytes() != null);
+ assert info.value() != null;
+
+ // Need to call this method to initialize info properly.
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos().get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos().put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+ for (CacheEntryInfoCollection col : infos().values()) {
+ List<GridCacheEntryInfo> entries = col.infos();
+
+ for (int i = 0; i < entries.size(); i++)
+ entries.get(i).unmarshal(cacheCtx, ldr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /**
+ * @return Number of entries in message.
+ */
+ public int size() {
+ return infos().size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeLong("updateSeq", updateSeq))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ updateSeq = reader.readLong("updateSeq");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 114;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 8;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+ "size", size(),
+ "parts", infos().keySet(),
+ "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index 28a73b1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.locks.ReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final ReadWriteLock busyLock;
-
- /** */
- private GridDhtPartitionTopology top;
-
- /** */
- private final Collection<SupplyWorker> workers = new LinkedList<>();
-
- /** */
- private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
- /** */
- private final boolean depEnabled;
-
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
- /**
- * @param cctx Cache context.
- * @param busyLock Shutdown lock.
- */
- GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
- assert cctx != null;
- assert busyLock != null;
-
- this.cctx = cctx;
- this.busyLock = busyLock;
-
- log = cctx.logger(getClass());
-
- top = cctx.dht().topology();
-
- if (!cctx.kernalContext().clientNode()) {
- int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
- for (int i = 0; i < poolSize; i++)
- workers.add(new SupplyWorker());
-
- cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processDemandMessage(id, m);
- }
- });
- }
-
- depEnabled = cctx.gridDeploy().enabled();
- }
-
- /**
- *
- */
- void start() {
- for (SupplyWorker w : workers)
- new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
- }
-
- /**
- *
- */
- void stop() {
- U.cancel(workers);
- U.join(workers, log);
-
- top = null;
- }
-
- /**
- * Sets preload predicate for supply pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
- * @return Size of this thread pool.
- */
- int poolSize() {
- return cctx.config().getRebalanceThreadPoolSize();
- }
-
- /**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- * @param nodeId Sender node ID.
- * @param d Message.
- */
- private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
- if (!enterBusy())
- return;
-
- try {
- if (cctx.rebalanceEnabled()) {
- if (log.isDebugEnabled())
- log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
-
- queue.offer(new DemandMessage(nodeId, d));
- }
- else
- U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
- * @param deque Deque to poll from.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(2000, MILLISECONDS);
- }
-
- /**
- * Supply work.
- */
- private class SupplyWorker extends GridWorker {
- /** Hide worker logger and use cache logger. */
- private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
-
- /**
- * Default constructor.
- */
- private SupplyWorker() {
- super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- DemandMessage msg = poll(queue, this);
-
- if (msg == null)
- continue;
-
- ClusterNode node = cctx.discovery().node(msg.senderId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Received message from non-existing node (will ignore): " + msg);
-
- continue;
- }
-
- processMessage(msg, node);
- }
- }
-
- /**
- * @param msg Message.
- * @param node Demander.
- */
- private void processMessage(DemandMessage msg, ClusterNode node) {
- assert msg != null;
- assert node != null;
-
- GridDhtPartitionDemandMessage d = msg.message();
-
- GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
-
- long preloadThrottle = cctx.config().getRebalanceThrottle();
-
- boolean ack = false;
-
- try {
- for (int part : d.partitions()) {
- GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
- if (loc == null || loc.state() != OWNING || !loc.reserve()) {
- // Reply with partition of "-1" to let sender know that
- // this node is no longer an owner.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Requested partition is not owned by local node [part=" + part +
- ", demander=" + msg.senderId() + ']');
-
- continue;
- }
-
- GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
- try {
- if (cctx.isSwapOrOffheapEnabled()) {
- swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
- cctx.swap().addOffHeapListener(part, swapLsnr);
- cctx.swap().addSwapListener(part, swapLsnr);
- }
-
- boolean partMissing = false;
-
- for (GridCacheEntryEx e : loc.entries()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition, so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition [part=" + part +
- ", nodeId=" + msg.senderId() + ']');
-
- partMissing = true;
-
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId(), cctx.deploymentEnabled());
- }
-
- GridCacheEntryInfo info = e.info();
-
- if (info != null && !info.isNew()) {
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- if (partMissing)
- continue;
-
- if (cctx.isSwapOrOffheapEnabled()) {
- GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
- cctx.swap().iterator(part);
-
- // Iterator may be null if space does not exist.
- if (iter != null) {
- try {
- boolean prepared = false;
-
- for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
- partMissing = true;
-
- break; // For.
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
- }
-
- GridCacheSwapEntry swapEntry = e.getValue();
-
- GridCacheEntryInfo info = new GridCacheEntryInfo();
-
- info.keyBytes(e.getKey());
- info.ttl(swapEntry.ttl());
- info.expireTime(swapEntry.expireTime());
- info.version(swapEntry.version());
- info.value(swapEntry.value());
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry0(part, info, cctx);
- else {
- if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not send " +
- "cache entry): " + info);
-
- continue;
- }
-
- // Need to manually prepare cache message.
- if (depEnabled && !prepared) {
- ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
- swapEntry.valueClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
- null;
-
- if (ldr == null)
- continue;
-
- if (ldr instanceof GridDeploymentInfo) {
- s.prepare((GridDeploymentInfo)ldr);
-
- prepared = true;
- }
- }
- }
-
- if (partMissing)
- continue;
- }
- finally {
- iter.close();
- }
- }
- }
-
- // Stop receiving promote notifications.
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
-
- if (swapLsnr != null) {
- Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
- swapLsnr = null;
-
- for (GridCacheEntryInfo info : entries) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
- // No need to continue iteration over swap entries.
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(),
- cctx.cacheId(), cctx.deploymentEnabled());
- }
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- // Mark as last supply message.
- s.last(part);
-
- if (ack) {
- s.markAck();
-
- break; // Partition for loop.
- }
- }
- finally {
- loc.release();
-
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
- }
- }
-
- reply(node, d, s);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
- }
- }
-
- /**
- * @param n Node.
- * @param d Demand message.
- * @param s Supply message.
- * @return {@code True} if message was sent, {@code false} if recipient left grid.
- * @throws IgniteCheckedException If failed.
- */
- private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
- throws IgniteCheckedException {
- try {
- if (log.isDebugEnabled())
- log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
- return true;
- }
- catch (ClusterTopologyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
- return false;
- }
- }
- }
-
- /**
- * Demand message wrapper.
- */
- private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param sndId Sender ID.
- * @param msg Message.
- */
- DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
- super(sndId, msg);
- }
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public DemandMessage() {
- // No-op.
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return get1();
- }
-
- /**
- * @return Message.
- */
- public GridDhtPartitionDemandMessage message() {
- return get2();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cef38e8..2f2944d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -742,6 +742,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
// Must initialize topology after we get discovery event.
initTopology(cacheCtx);
+ cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion());
+
cacheCtx.preloader().updateLastExchangeFuture(this);
}